diff options
Diffstat (limited to 'klippy/extras/output_pin.py')
-rw-r--r-- | klippy/extras/output_pin.py | 101 |
1 files changed, 62 insertions, 39 deletions
diff --git a/klippy/extras/output_pin.py b/klippy/extras/output_pin.py index bde7ea69..85a9e91c 100644 --- a/klippy/extras/output_pin.py +++ b/klippy/extras/output_pin.py @@ -11,6 +11,7 @@ from .display import display # G-Code request queuing helper ###################################################################### + # Helper code to queue g-code requests class GCodeRequestQueue: def __init__(self, config, mcu, callback): @@ -18,12 +19,14 @@ class GCodeRequestQueue: self.mcu = mcu self.callback = callback self.rqueue = [] - self.next_min_flush_time = 0. + self.next_min_flush_time = 0.0 self.toolhead = None mcu.register_flush_callback(self._flush_notification) printer.register_event_handler("klippy:connect", self._handle_connect) + def _handle_connect(self): - self.toolhead = self.printer.lookup_object('toolhead') + self.toolhead = self.printer.lookup_object("toolhead") + def _flush_notification(self, print_time, clock): min_sched_time = self.mcu.min_schedule_time() rqueue = self.rqueue @@ -37,26 +40,30 @@ class GCodeRequestQueue: pos += 1 req_pt, req_val = rqueue[pos] # Invoke callback for the request - min_wait = 0. + min_wait = 0.0 ret = self.callback(next_time, req_val) if ret is not None: # Handle special cases action, min_wait = ret if action == "discard": - del rqueue[:pos+1] + del rqueue[: pos + 1] continue if action == "delay": pos -= 1 - del rqueue[:pos+1] + del rqueue[: pos + 1] self.next_min_flush_time = next_time + max(min_wait, min_sched_time) # Ensure following queue items are flushed self.toolhead.note_mcu_movequeue_activity(self.next_min_flush_time) + def _queue_request(self, print_time, value): self.rqueue.append((print_time, value)) self.toolhead.note_mcu_movequeue_activity(print_time) + def queue_gcode_request(self, value): self.toolhead.register_lookahead_callback( - (lambda pt: self._queue_request(pt, value))) + (lambda pt: self._queue_request(pt, value)) + ) + def send_async_request(self, value, print_time=None): min_sched_time = self.mcu.min_schedule_time() if print_time is None: @@ -65,7 +72,7 @@ class GCodeRequestQueue: while 1: next_time = max(print_time, self.next_min_flush_time) # Invoke callback for the request - action, min_wait = "normal", 0. + action, min_wait = "normal", 0.0 ret = self.callback(next_time, value) if ret is not None: # Handle special cases @@ -84,6 +91,7 @@ class GCodeRequestQueue: # Time between each template update RENDER_TIME = 0.500 + # Main template evaluation code class PrinterTemplateEvaluator: def __init__(self, config): @@ -95,11 +103,13 @@ class PrinterTemplateEvaluator: self.templates = dtemplates.get_display_templates() gcode_macro = self.printer.load_object(config, "gcode_macro") self.create_template_context = gcode_macro.create_template_context + def _activate_timer(self): if self.render_timer is not None or not self.active_templates: return reactor = self.printer.get_reactor() self.render_timer = reactor.register_timer(self._render, reactor.NOW) + def _activate_template(self, callback, template, lparams, flush_callback): if template is not None: # Build a unique id to make it possible to cache duplicate rendering @@ -109,11 +119,11 @@ class PrinterTemplateEvaluator: except TypeError as e: # lparams is not static, so disable caching uid = None - self.active_templates[callback] = ( - uid, template, lparams, flush_callback) + self.active_templates[callback] = (uid, template, lparams, flush_callback) return if callback in self.active_templates: del self.active_templates[callback] + def _render(self, eventtime): if not self.active_templates: # Nothing to do - unregister timer @@ -123,9 +133,11 @@ class PrinterTemplateEvaluator: return reactor.NEVER # Setup gcode_macro template context context = self.create_template_context(eventtime) + def render(name, **kwargs): return self.templates[name].render(context, **kwargs) - context['render'] = render + + context["render"] = render # Render all templates flush_callbacks = {} render_cache = {} @@ -143,11 +155,12 @@ class PrinterTemplateEvaluator: if flush_callback is not None: flush_callbacks[flush_callback] = 1 callback(text) - context.clear() # Remove circular references for better gc + context.clear() # Remove circular references for better gc # Invoke optional flush callbacks for flush_callback in flush_callbacks.keys(): flush_callback() return eventtime + RENDER_TIME + def set_template(self, gcmd, callback, flush_callback=None): template = None lparams = {} @@ -162,8 +175,7 @@ class PrinterTemplateEvaluator: continue p = p.lower() if p not in tparams: - raise gcmd.error("Invalid display_template parameter: %s" - % (p,)) + raise gcmd.error("Invalid display_template parameter: %s" % (p,)) try: lparams[p] = ast.literal_eval(v) except ValueError as e: @@ -171,6 +183,7 @@ class PrinterTemplateEvaluator: self._activate_template(callback, template, lparams, flush_callback) self._activate_timer() + def lookup_template_eval(config): printer = config.get_printer() te = printer.lookup_object("template_evaluator", None) @@ -184,62 +197,71 @@ def lookup_template_eval(config): # Main output pin handling ###################################################################### + class PrinterOutputPin: def __init__(self, config): self.printer = config.get_printer() - ppins = self.printer.lookup_object('pins') + ppins = self.printer.lookup_object("pins") # Determine pin type - self.is_pwm = config.getboolean('pwm', False) + self.is_pwm = config.getboolean("pwm", False) if self.is_pwm: - self.mcu_pin = ppins.setup_pin('pwm', config.get('pin')) + self.mcu_pin = ppins.setup_pin("pwm", config.get("pin")) max_duration = self.mcu_pin.get_mcu().max_nominal_duration() - cycle_time = config.getfloat('cycle_time', 0.100, above=0., - maxval=max_duration) - hardware_pwm = config.getboolean('hardware_pwm', False) + cycle_time = config.getfloat( + "cycle_time", 0.100, above=0.0, maxval=max_duration + ) + hardware_pwm = config.getboolean("hardware_pwm", False) self.mcu_pin.setup_cycle_time(cycle_time, hardware_pwm) - self.scale = config.getfloat('scale', 1., above=0.) + self.scale = config.getfloat("scale", 1.0, above=0.0) else: - self.mcu_pin = ppins.setup_pin('digital_out', config.get('pin')) - self.scale = 1. - self.mcu_pin.setup_max_duration(0.) + self.mcu_pin = ppins.setup_pin("digital_out", config.get("pin")) + self.scale = 1.0 + self.mcu_pin.setup_max_duration(0.0) # Determine start and shutdown values - self.last_value = config.getfloat( - 'value', 0., minval=0., maxval=self.scale) / self.scale - self.shutdown_value = config.getfloat( - 'shutdown_value', 0., minval=0., maxval=self.scale) / self.scale + self.last_value = ( + config.getfloat("value", 0.0, minval=0.0, maxval=self.scale) / self.scale + ) + self.shutdown_value = ( + config.getfloat("shutdown_value", 0.0, minval=0.0, maxval=self.scale) + / self.scale + ) self.mcu_pin.setup_start_value(self.last_value, self.shutdown_value) # Create gcode request queue - self.gcrq = GCodeRequestQueue(config, self.mcu_pin.get_mcu(), - self._set_pin) + self.gcrq = GCodeRequestQueue(config, self.mcu_pin.get_mcu(), self._set_pin) # Template handling self.template_eval = lookup_template_eval(config) # Register commands pin_name = config.get_name().split()[1] - gcode = self.printer.lookup_object('gcode') - gcode.register_mux_command("SET_PIN", "PIN", pin_name, - self.cmd_SET_PIN, - desc=self.cmd_SET_PIN_help) + gcode = self.printer.lookup_object("gcode") + gcode.register_mux_command( + "SET_PIN", "PIN", pin_name, self.cmd_SET_PIN, desc=self.cmd_SET_PIN_help + ) + def get_status(self, eventtime): - return {'value': self.last_value} + return {"value": self.last_value} + def _set_pin(self, print_time, value): if value == self.last_value: - return "discard", 0. + return "discard", 0.0 self.last_value = value if self.is_pwm: self.mcu_pin.set_pwm(print_time, value) else: self.mcu_pin.set_digital(print_time, value) + def _template_update(self, text): try: value = float(text) except ValueError as e: logging.exception("output_pin template render error") - value = 0. + value = 0.0 self.gcrq.send_async_request(value) + cmd_SET_PIN_help = "Set the value of an output pin" + def cmd_SET_PIN(self, gcmd): - value = gcmd.get_float('VALUE', None, minval=0., maxval=self.scale) - template = gcmd.get('TEMPLATE', None) + value = gcmd.get_float("VALUE", None, minval=0.0, maxval=self.scale) + template = gcmd.get("TEMPLATE", None) if (value is None) == (template is None): raise gcmd.error("SET_PIN command must specify VALUE or TEMPLATE") # Check for template setting @@ -248,10 +270,11 @@ class PrinterOutputPin: return # Read requested value value /= self.scale - if not self.is_pwm and value not in [0., 1.]: + if not self.is_pwm and value not in [0.0, 1.0]: raise gcmd.error("Invalid pin value") # Queue requested value self.gcrq.queue_gcode_request(value) + def load_config_prefix(config): return PrinterOutputPin(config) |