aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--klippy/extras/smart_effector.py154
1 files changed, 154 insertions, 0 deletions
diff --git a/klippy/extras/smart_effector.py b/klippy/extras/smart_effector.py
new file mode 100644
index 00000000..6076a246
--- /dev/null
+++ b/klippy/extras/smart_effector.py
@@ -0,0 +1,154 @@
+# SmartEffector support
+#
+# Copyright (C) 2021 Dmitry Butyugin <dmbutyugin@google.com>
+#
+# This file may be distributed under the terms of the GNU GPLv3 license.
+
+import logging
+from . import probe
+
+# SmartEffector communication protocol implemented here originates from
+# https://github.com/Duet3D/SmartEffectorFirmware
+BITS_PER_SECOND = 1000.
+
+class ControlPinHelper:
+ def __init__(self, pin_params):
+ self._mcu = pin_params['chip']
+ self._pin = pin_params['pin']
+ self._start_value = self._invert = pin_params['invert']
+ self._oid = None
+ self._set_cmd = None
+ self._mcu.register_config_callback(self._build_config)
+ def _build_config(self):
+ self._mcu.request_move_queue_slot()
+ self._oid = self._mcu.create_oid()
+ self._mcu.add_config_cmd(
+ "config_digital_out oid=%d pin=%s value=%d default_value=%d"
+ " max_duration=%d" % (self._oid, self._pin, self._start_value,
+ self._start_value, 0))
+ cmd_queue = self._mcu.alloc_command_queue()
+ self._set_cmd = self._mcu.lookup_command(
+ "queue_digital_out oid=%c clock=%u on_ticks=%u", cq=cmd_queue)
+ def write_bits(self, start_time, bit_stream):
+ bit_step = 1. / BITS_PER_SECOND
+ last_value = self._start_value
+ bit_time = start_time
+ for b in bit_stream:
+ value = (not not b) ^ self._invert
+ if value != last_value:
+ clock = self._mcu.print_time_to_clock(bit_time)
+ self._set_cmd.send([self._oid, clock, value])
+ last_value = value
+ bit_time += bit_step
+ # After the last bit, the signal on the control pin must go back
+ # to its start value.
+ if value != self._start_value:
+ clock = self._mcu.print_time_to_clock(bit_time)
+ self._set_cmd.send([self._oid, clock, self._start_value])
+ bit_time += bit_step
+ return bit_time
+
+class SmartEffectorEndstopWrapper:
+ def __init__(self, config):
+ self.printer = config.get_printer()
+ self.gcode = self.printer.lookup_object('gcode')
+ self.probe_accel = config.getfloat('probe_accel', 0., minval=0.)
+ self.recovery_time = config.getfloat('recovery_time', 0.4, minval=0.)
+ self.probe_wrapper = probe.ProbeEndstopWrapper(config)
+ # Wrappers
+ self.get_mcu = self.probe_wrapper.get_mcu
+ self.add_stepper = self.probe_wrapper.add_stepper
+ self.get_steppers = self.probe_wrapper.get_steppers
+ self.home_start = self.probe_wrapper.home_start
+ self.home_wait = self.probe_wrapper.home_wait
+ self.query_endstop = self.probe_wrapper.query_endstop
+ self.multi_probe_begin = self.probe_wrapper.multi_probe_begin
+ self.multi_probe_end = self.probe_wrapper.multi_probe_end
+ # SmartEffector control
+ control_pin = config.get('control_pin', None)
+ if control_pin:
+ ppins = self.printer.lookup_object('pins')
+ pin_params = ppins.lookup_pin(control_pin, can_invert=True)
+ self.control_pin = ControlPinHelper(pin_params)
+ self.gcode.register_command("RESET_SMART_EFFECTOR",
+ self.cmd_RESET_SMART_EFFECTOR,
+ desc=self.cmd_RESET_SMART_EFFECTOR_help)
+ else:
+ self.control_pin = None
+ self.gcode.register_command("SET_SMART_EFFECTOR",
+ self.cmd_SET_SMART_EFFECTOR,
+ desc=self.cmd_SET_SMART_EFFECTOR_help)
+ def probe_prepare(self, hmove):
+ toolhead = self.printer.lookup_object('toolhead')
+ self.probe_wrapper.probe_prepare(hmove)
+ if self.probe_accel:
+ systime = self.printer.get_reactor().monotonic()
+ toolhead_info = toolhead.get_status(systime)
+ self.old_max_accel = toolhead_info['max_accel']
+ self.gcode.run_script_from_command(
+ "M204 S%.3f" % (self.probe_accel,))
+ if self.recovery_time:
+ toolhead.dwell(self.recovery_time)
+ def probe_finish(self, hmove):
+ if self.probe_accel:
+ self.gcode.run_script_from_command(
+ "M204 S%.3f" % (self.old_max_accel,))
+ self.probe_wrapper.probe_finish(hmove)
+ def _send_command(self, buf):
+ # Each byte is sent to the SmartEffector as
+ # [0 0 1 0 b7 b6 b5 b4 !b4 b3 b2 b1 b0 !b0]
+ bit_stream = []
+ for b in buf:
+ b = b & 0xFF
+ bit_stream.extend([0, 0, 1, 0])
+ bit_stream.extend([b & 0x80, b & 0x40, b & 0x20, b & 0x10])
+ bit_stream.append((~b) & 0x10)
+ bit_stream.extend([b & 0x08, b & 0x04, b & 0x02, b & 0x01])
+ bit_stream.append((~b) & 0x01)
+ # Wait for previous actions to finish
+ toolhead = self.printer.lookup_object('toolhead')
+ toolhead.wait_moves()
+ start_time = toolhead.get_last_move_time()
+ # Write generated bits to the control pin
+ end_time = self.control_pin.write_bits(start_time, bit_stream)
+ # Dwell to make sure no subseqent actions are queued together
+ # with the SmartEffector programming
+ toolhead.dwell(end_time - start_time)
+ toolhead.wait_moves()
+ cmd_SET_SMART_EFFECTOR_help = 'Set SmartEffector parameters'
+ def cmd_SET_SMART_EFFECTOR(self, gcmd):
+ sensitivity = gcmd.get_int('SENSITIVITY', None, minval=0, maxval=255)
+ respond_info = []
+ if sensitivity is not None:
+ if self.control_pin is not None:
+ buf = [105, sensitivity, 255 - sensitivity]
+ self._send_command(buf)
+ respond_info.append("sensitivity: %d" % (sensitivity,))
+ else:
+ raise gcmd.error("control_pin must be set in [smart_effector] "
+ "for sensitivity programming")
+ self.probe_accel = gcmd.get_float('ACCEL', self.probe_accel, minval=0.)
+ self.recovery_time = gcmd.get_float('RECOVERY_TIME', self.recovery_time,
+ minval=0.)
+ if self.probe_accel:
+ respond_info.append(
+ "probing accelartion: %.3f" % (self.probe_accel,))
+ else:
+ respond_info.append("probing acceleration control disabled")
+ if self.recovery_time:
+ respond_info.append(
+ "probe recovery time: %.3f" % (self.recovery_time,))
+ else:
+ respond_info.append("probe recovery time disabled")
+ gcmd.respond_info("SmartEffector:\n" + "\n".join(respond_info))
+ cmd_RESET_SMART_EFFECTOR_help = 'Reset SmartEffector settings (sensitivity)'
+ def cmd_RESET_SMART_EFFECTOR(self, gcmd):
+ buf = [131, 131]
+ self._send_command(buf)
+ gcmd.respond_info('SmartEffector sensitivity was reset')
+
+def load_config(config):
+ smart_effector = SmartEffectorEndstopWrapper(config)
+ config.get_printer().add_object('probe',
+ probe.PrinterProbe(config, smart_effector))
+ return smart_effector