aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/extras/output_pin.py
diff options
context:
space:
mode:
Diffstat (limited to 'klippy/extras/output_pin.py')
-rw-r--r--klippy/extras/output_pin.py101
1 files changed, 62 insertions, 39 deletions
diff --git a/klippy/extras/output_pin.py b/klippy/extras/output_pin.py
index bde7ea69..85a9e91c 100644
--- a/klippy/extras/output_pin.py
+++ b/klippy/extras/output_pin.py
@@ -11,6 +11,7 @@ from .display import display
# G-Code request queuing helper
######################################################################
+
# Helper code to queue g-code requests
class GCodeRequestQueue:
def __init__(self, config, mcu, callback):
@@ -18,12 +19,14 @@ class GCodeRequestQueue:
self.mcu = mcu
self.callback = callback
self.rqueue = []
- self.next_min_flush_time = 0.
+ self.next_min_flush_time = 0.0
self.toolhead = None
mcu.register_flush_callback(self._flush_notification)
printer.register_event_handler("klippy:connect", self._handle_connect)
+
def _handle_connect(self):
- self.toolhead = self.printer.lookup_object('toolhead')
+ self.toolhead = self.printer.lookup_object("toolhead")
+
def _flush_notification(self, print_time, clock):
min_sched_time = self.mcu.min_schedule_time()
rqueue = self.rqueue
@@ -37,26 +40,30 @@ class GCodeRequestQueue:
pos += 1
req_pt, req_val = rqueue[pos]
# Invoke callback for the request
- min_wait = 0.
+ min_wait = 0.0
ret = self.callback(next_time, req_val)
if ret is not None:
# Handle special cases
action, min_wait = ret
if action == "discard":
- del rqueue[:pos+1]
+ del rqueue[: pos + 1]
continue
if action == "delay":
pos -= 1
- del rqueue[:pos+1]
+ del rqueue[: pos + 1]
self.next_min_flush_time = next_time + max(min_wait, min_sched_time)
# Ensure following queue items are flushed
self.toolhead.note_mcu_movequeue_activity(self.next_min_flush_time)
+
def _queue_request(self, print_time, value):
self.rqueue.append((print_time, value))
self.toolhead.note_mcu_movequeue_activity(print_time)
+
def queue_gcode_request(self, value):
self.toolhead.register_lookahead_callback(
- (lambda pt: self._queue_request(pt, value)))
+ (lambda pt: self._queue_request(pt, value))
+ )
+
def send_async_request(self, value, print_time=None):
min_sched_time = self.mcu.min_schedule_time()
if print_time is None:
@@ -65,7 +72,7 @@ class GCodeRequestQueue:
while 1:
next_time = max(print_time, self.next_min_flush_time)
# Invoke callback for the request
- action, min_wait = "normal", 0.
+ action, min_wait = "normal", 0.0
ret = self.callback(next_time, value)
if ret is not None:
# Handle special cases
@@ -84,6 +91,7 @@ class GCodeRequestQueue:
# Time between each template update
RENDER_TIME = 0.500
+
# Main template evaluation code
class PrinterTemplateEvaluator:
def __init__(self, config):
@@ -95,11 +103,13 @@ class PrinterTemplateEvaluator:
self.templates = dtemplates.get_display_templates()
gcode_macro = self.printer.load_object(config, "gcode_macro")
self.create_template_context = gcode_macro.create_template_context
+
def _activate_timer(self):
if self.render_timer is not None or not self.active_templates:
return
reactor = self.printer.get_reactor()
self.render_timer = reactor.register_timer(self._render, reactor.NOW)
+
def _activate_template(self, callback, template, lparams, flush_callback):
if template is not None:
# Build a unique id to make it possible to cache duplicate rendering
@@ -109,11 +119,11 @@ class PrinterTemplateEvaluator:
except TypeError as e:
# lparams is not static, so disable caching
uid = None
- self.active_templates[callback] = (
- uid, template, lparams, flush_callback)
+ self.active_templates[callback] = (uid, template, lparams, flush_callback)
return
if callback in self.active_templates:
del self.active_templates[callback]
+
def _render(self, eventtime):
if not self.active_templates:
# Nothing to do - unregister timer
@@ -123,9 +133,11 @@ class PrinterTemplateEvaluator:
return reactor.NEVER
# Setup gcode_macro template context
context = self.create_template_context(eventtime)
+
def render(name, **kwargs):
return self.templates[name].render(context, **kwargs)
- context['render'] = render
+
+ context["render"] = render
# Render all templates
flush_callbacks = {}
render_cache = {}
@@ -143,11 +155,12 @@ class PrinterTemplateEvaluator:
if flush_callback is not None:
flush_callbacks[flush_callback] = 1
callback(text)
- context.clear() # Remove circular references for better gc
+ context.clear() # Remove circular references for better gc
# Invoke optional flush callbacks
for flush_callback in flush_callbacks.keys():
flush_callback()
return eventtime + RENDER_TIME
+
def set_template(self, gcmd, callback, flush_callback=None):
template = None
lparams = {}
@@ -162,8 +175,7 @@ class PrinterTemplateEvaluator:
continue
p = p.lower()
if p not in tparams:
- raise gcmd.error("Invalid display_template parameter: %s"
- % (p,))
+ raise gcmd.error("Invalid display_template parameter: %s" % (p,))
try:
lparams[p] = ast.literal_eval(v)
except ValueError as e:
@@ -171,6 +183,7 @@ class PrinterTemplateEvaluator:
self._activate_template(callback, template, lparams, flush_callback)
self._activate_timer()
+
def lookup_template_eval(config):
printer = config.get_printer()
te = printer.lookup_object("template_evaluator", None)
@@ -184,62 +197,71 @@ def lookup_template_eval(config):
# Main output pin handling
######################################################################
+
class PrinterOutputPin:
def __init__(self, config):
self.printer = config.get_printer()
- ppins = self.printer.lookup_object('pins')
+ ppins = self.printer.lookup_object("pins")
# Determine pin type
- self.is_pwm = config.getboolean('pwm', False)
+ self.is_pwm = config.getboolean("pwm", False)
if self.is_pwm:
- self.mcu_pin = ppins.setup_pin('pwm', config.get('pin'))
+ self.mcu_pin = ppins.setup_pin("pwm", config.get("pin"))
max_duration = self.mcu_pin.get_mcu().max_nominal_duration()
- cycle_time = config.getfloat('cycle_time', 0.100, above=0.,
- maxval=max_duration)
- hardware_pwm = config.getboolean('hardware_pwm', False)
+ cycle_time = config.getfloat(
+ "cycle_time", 0.100, above=0.0, maxval=max_duration
+ )
+ hardware_pwm = config.getboolean("hardware_pwm", False)
self.mcu_pin.setup_cycle_time(cycle_time, hardware_pwm)
- self.scale = config.getfloat('scale', 1., above=0.)
+ self.scale = config.getfloat("scale", 1.0, above=0.0)
else:
- self.mcu_pin = ppins.setup_pin('digital_out', config.get('pin'))
- self.scale = 1.
- self.mcu_pin.setup_max_duration(0.)
+ self.mcu_pin = ppins.setup_pin("digital_out", config.get("pin"))
+ self.scale = 1.0
+ self.mcu_pin.setup_max_duration(0.0)
# Determine start and shutdown values
- self.last_value = config.getfloat(
- 'value', 0., minval=0., maxval=self.scale) / self.scale
- self.shutdown_value = config.getfloat(
- 'shutdown_value', 0., minval=0., maxval=self.scale) / self.scale
+ self.last_value = (
+ config.getfloat("value", 0.0, minval=0.0, maxval=self.scale) / self.scale
+ )
+ self.shutdown_value = (
+ config.getfloat("shutdown_value", 0.0, minval=0.0, maxval=self.scale)
+ / self.scale
+ )
self.mcu_pin.setup_start_value(self.last_value, self.shutdown_value)
# Create gcode request queue
- self.gcrq = GCodeRequestQueue(config, self.mcu_pin.get_mcu(),
- self._set_pin)
+ self.gcrq = GCodeRequestQueue(config, self.mcu_pin.get_mcu(), self._set_pin)
# Template handling
self.template_eval = lookup_template_eval(config)
# Register commands
pin_name = config.get_name().split()[1]
- gcode = self.printer.lookup_object('gcode')
- gcode.register_mux_command("SET_PIN", "PIN", pin_name,
- self.cmd_SET_PIN,
- desc=self.cmd_SET_PIN_help)
+ gcode = self.printer.lookup_object("gcode")
+ gcode.register_mux_command(
+ "SET_PIN", "PIN", pin_name, self.cmd_SET_PIN, desc=self.cmd_SET_PIN_help
+ )
+
def get_status(self, eventtime):
- return {'value': self.last_value}
+ return {"value": self.last_value}
+
def _set_pin(self, print_time, value):
if value == self.last_value:
- return "discard", 0.
+ return "discard", 0.0
self.last_value = value
if self.is_pwm:
self.mcu_pin.set_pwm(print_time, value)
else:
self.mcu_pin.set_digital(print_time, value)
+
def _template_update(self, text):
try:
value = float(text)
except ValueError as e:
logging.exception("output_pin template render error")
- value = 0.
+ value = 0.0
self.gcrq.send_async_request(value)
+
cmd_SET_PIN_help = "Set the value of an output pin"
+
def cmd_SET_PIN(self, gcmd):
- value = gcmd.get_float('VALUE', None, minval=0., maxval=self.scale)
- template = gcmd.get('TEMPLATE', None)
+ value = gcmd.get_float("VALUE", None, minval=0.0, maxval=self.scale)
+ template = gcmd.get("TEMPLATE", None)
if (value is None) == (template is None):
raise gcmd.error("SET_PIN command must specify VALUE or TEMPLATE")
# Check for template setting
@@ -248,10 +270,11 @@ class PrinterOutputPin:
return
# Read requested value
value /= self.scale
- if not self.is_pwm and value not in [0., 1.]:
+ if not self.is_pwm and value not in [0.0, 1.0]:
raise gcmd.error("Invalid pin value")
# Queue requested value
self.gcrq.queue_gcode_request(value)
+
def load_config_prefix(config):
return PrinterOutputPin(config)