aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/extras/buttons.py
blob: 0b90f1a371be8908d16617a50c24feea0c8acb20 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# Support for button detection and callbacks
#
# Copyright (C) 2018-2023  Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging


######################################################################
# Button state tracking
######################################################################

QUERY_TIME = 0.002
RETRANSMIT_COUNT = 50


class MCU_buttons:
    def __init__(self, printer, mcu):
        self.reactor = printer.get_reactor()
        self.mcu = mcu
        self.mcu.register_config_callback(self.build_config)
        self.pin_list = []
        self.callbacks = []
        self.invert = self.last_button = 0
        self.ack_cmd = None
        self.ack_count = 0

    def setup_buttons(self, pins, callback):
        mask = 0
        shift = len(self.pin_list)
        for pin_params in pins:
            if pin_params["invert"]:
                self.invert |= 1 << len(self.pin_list)
            mask |= 1 << len(self.pin_list)
            self.pin_list.append((pin_params["pin"], pin_params["pullup"]))
        self.callbacks.append((mask, shift, callback))

    def build_config(self):
        if not self.pin_list:
            return
        self.oid = self.mcu.create_oid()
        self.mcu.add_config_cmd(
            "config_buttons oid=%d button_count=%d" % (self.oid, len(self.pin_list))
        )
        for i, (pin, pull_up) in enumerate(self.pin_list):
            self.mcu.add_config_cmd(
                "buttons_add oid=%d pos=%d pin=%s pull_up=%d"
                % (self.oid, i, pin, pull_up),
                is_init=True,
            )
        cmd_queue = self.mcu.alloc_command_queue()
        self.ack_cmd = self.mcu.lookup_command(
            "buttons_ack oid=%c count=%c", cq=cmd_queue
        )
        clock = self.mcu.get_query_slot(self.oid)
        rest_ticks = self.mcu.seconds_to_clock(QUERY_TIME)
        self.mcu.add_config_cmd(
            "buttons_query oid=%d clock=%d"
            " rest_ticks=%d retransmit_count=%d invert=%d"
            % (self.oid, clock, rest_ticks, RETRANSMIT_COUNT, self.invert),
            is_init=True,
        )
        self.mcu.register_response(self.handle_buttons_state, "buttons_state", self.oid)

    def handle_buttons_state(self, params):
        # Expand the message ack_count from 8-bit
        ack_count = self.ack_count
        ack_diff = (params["ack_count"] - ack_count) & 0xFF
        ack_diff -= (ack_diff & 0x80) << 1
        msg_ack_count = ack_count + ack_diff
        # Determine new buttons
        buttons = bytearray(params["state"])
        new_count = msg_ack_count + len(buttons) - self.ack_count
        if new_count <= 0:
            return
        new_buttons = buttons[-new_count:]
        # Send ack to MCU
        self.ack_cmd.send([self.oid, new_count])
        self.ack_count += new_count
        # Invoke callbacks with this event in main thread
        btime = params["#receive_time"]
        for button in new_buttons:
            button ^= self.invert
            changed = button ^ self.last_button
            self.last_button = button
            for mask, shift, callback in self.callbacks:
                if changed & mask:
                    state = (button & mask) >> shift
                    self.reactor.register_async_callback(
                        (lambda et, c=callback, bt=btime, s=state: c(bt, s))
                    )


######################################################################
# ADC button tracking
######################################################################

ADC_REPORT_TIME = 0.015
ADC_DEBOUNCE_TIME = 0.025
ADC_SAMPLE_TIME = 0.001
ADC_SAMPLE_COUNT = 6


class MCU_ADC_buttons:
    def __init__(self, printer, pin, pullup):
        self.reactor = printer.get_reactor()
        self.buttons = []
        self.last_button = None
        self.last_pressed = None
        self.last_debouncetime = 0
        self.pullup = pullup
        self.pin = pin
        self.min_value = 999999999999.9
        self.max_value = 0.0
        ppins = printer.lookup_object("pins")
        self.mcu_adc = ppins.setup_pin("adc", self.pin)
        self.mcu_adc.setup_adc_sample(ADC_SAMPLE_TIME, ADC_SAMPLE_COUNT)
        self.mcu_adc.setup_adc_callback(ADC_REPORT_TIME, self.adc_callback)
        query_adc = printer.lookup_object("query_adc")
        query_adc.register_adc("adc_button:" + pin.strip(), self.mcu_adc)

    def setup_button(self, min_value, max_value, callback):
        self.min_value = min(self.min_value, min_value)
        self.max_value = max(self.max_value, max_value)
        self.buttons.append((min_value, max_value, callback))

    def adc_callback(self, read_time, read_value):
        adc = max(0.00001, min(0.99999, read_value))
        value = self.pullup * adc / (1.0 - adc)

        # Determine button pressed
        btn = None
        if self.min_value <= value <= self.max_value:
            for i, (min_value, max_value, cb) in enumerate(self.buttons):
                if min_value < value < max_value:
                    btn = i
                    break

        # If the button changed, due to noise or pressing:
        if btn != self.last_button:
            # reset the debouncing timer
            self.last_debouncetime = read_time

        # button debounce check & new button pressed
        if (
            (read_time - self.last_debouncetime) >= ADC_DEBOUNCE_TIME
            and self.last_button == btn
            and self.last_pressed != btn
        ):
            # release last_pressed
            if self.last_pressed is not None:
                self.call_button(self.last_pressed, False)
                self.last_pressed = None
            if btn is not None:
                self.call_button(btn, True)
                self.last_pressed = btn

        self.last_button = btn

    def call_button(self, button, state):
        minval, maxval, callback = self.buttons[button]
        self.reactor.register_async_callback((lambda e, cb=callback, s=state: cb(e, s)))


######################################################################
# Rotary Encoders
######################################################################


# Rotary encoder handler https://github.com/brianlow/Rotary
# Copyright 2011 Ben Buxton (bb@cactii.net).
# Licenced under the GNU GPL Version 3.
class BaseRotaryEncoder:
    R_START = 0x0
    R_DIR_CW = 0x10
    R_DIR_CCW = 0x20
    R_DIR_MSK = 0x30

    def __init__(self, cw_callback, ccw_callback):
        self.cw_callback = cw_callback
        self.ccw_callback = ccw_callback
        self.encoder_state = self.R_START

    def encoder_callback(self, eventtime, state):
        es = self.ENCODER_STATES[self.encoder_state & 0xF][state & 0x3]
        self.encoder_state = es
        if es & self.R_DIR_MSK == self.R_DIR_CW:
            self.cw_callback(eventtime)
        elif es & self.R_DIR_MSK == self.R_DIR_CCW:
            self.ccw_callback(eventtime)


class FullStepRotaryEncoder(BaseRotaryEncoder):
    R_CW_FINAL = 0x1
    R_CW_BEGIN = 0x2
    R_CW_NEXT = 0x3
    R_CCW_BEGIN = 0x4
    R_CCW_FINAL = 0x5
    R_CCW_NEXT = 0x6

    # Use the full-step state table (emits a code at 00 only)
    ENCODER_STATES = (
        # R_START
        (BaseRotaryEncoder.R_START, R_CW_BEGIN, R_CCW_BEGIN, BaseRotaryEncoder.R_START),
        # R_CW_FINAL
        (
            R_CW_NEXT,
            BaseRotaryEncoder.R_START,
            R_CW_FINAL,
            BaseRotaryEncoder.R_START | BaseRotaryEncoder.R_DIR_CW,
        ),
        # R_CW_BEGIN
        (R_CW_NEXT, R_CW_BEGIN, BaseRotaryEncoder.R_START, BaseRotaryEncoder.R_START),
        # R_CW_NEXT
        (R_CW_NEXT, R_CW_BEGIN, R_CW_FINAL, BaseRotaryEncoder.R_START),
        # R_CCW_BEGIN
        (R_CCW_NEXT, BaseRotaryEncoder.R_START, R_CCW_BEGIN, BaseRotaryEncoder.R_START),
        # R_CCW_FINAL
        (
            R_CCW_NEXT,
            R_CCW_FINAL,
            BaseRotaryEncoder.R_START,
            BaseRotaryEncoder.R_START | BaseRotaryEncoder.R_DIR_CCW,
        ),
        # R_CCW_NEXT
        (R_CCW_NEXT, R_CCW_FINAL, R_CCW_BEGIN, BaseRotaryEncoder.R_START),
    )


class HalfStepRotaryEncoder(BaseRotaryEncoder):
    # Use the half-step state table (emits a code at 00 and 11)
    R_CCW_BEGIN = 0x1
    R_CW_BEGIN = 0x2
    R_START_M = 0x3
    R_CW_BEGIN_M = 0x4
    R_CCW_BEGIN_M = 0x5

    ENCODER_STATES = (
        # R_START (00)
        (R_START_M, R_CW_BEGIN, R_CCW_BEGIN, BaseRotaryEncoder.R_START),
        # R_CCW_BEGIN
        (
            R_START_M | BaseRotaryEncoder.R_DIR_CCW,
            BaseRotaryEncoder.R_START,
            R_CCW_BEGIN,
            BaseRotaryEncoder.R_START,
        ),
        # R_CW_BEGIN
        (
            R_START_M | BaseRotaryEncoder.R_DIR_CW,
            R_CW_BEGIN,
            BaseRotaryEncoder.R_START,
            BaseRotaryEncoder.R_START,
        ),
        # R_START_M (11)
        (R_START_M, R_CCW_BEGIN_M, R_CW_BEGIN_M, BaseRotaryEncoder.R_START),
        # R_CW_BEGIN_M
        (
            R_START_M,
            R_START_M,
            R_CW_BEGIN_M,
            BaseRotaryEncoder.R_START | BaseRotaryEncoder.R_DIR_CW,
        ),
        # R_CCW_BEGIN_M
        (
            R_START_M,
            R_CCW_BEGIN_M,
            R_START_M,
            BaseRotaryEncoder.R_START | BaseRotaryEncoder.R_DIR_CCW,
        ),
    )


class DebounceButton:
    def __init__(self, config, button_action):
        self.printer = config.get_printer()
        self.reactor = self.printer.get_reactor()
        self.button_action = button_action
        self.debounce_delay = config.getfloat("debounce_delay", 0.0, minval=0.0)
        self.logical_state = None
        self.physical_state = None
        self.latest_eventtime = None

    def button_handler(self, eventtime, state):
        self.physical_state = state
        self.latest_eventtime = eventtime
        # if there would be no state transition, ignore the event:
        if self.logical_state == self.physical_state:
            return
        trigger_time = eventtime + self.debounce_delay
        self.reactor.register_callback(self._debounce_event, trigger_time)

    def _debounce_event(self, eventtime):
        # if there would be no state transition, ignore the event:
        if self.logical_state == self.physical_state:
            return
        # if there were more recent events, they supersede this one:
        if (eventtime - self.debounce_delay) < self.latest_eventtime:
            return
        # enact state transition and trigger action
        self.logical_state = self.physical_state
        self.button_action(self.latest_eventtime, self.logical_state)


######################################################################
# Button registration code
######################################################################


class PrinterButtons:
    def __init__(self, config):
        self.printer = config.get_printer()
        self.printer.load_object(config, "query_adc")
        self.mcu_buttons = {}
        self.adc_buttons = {}

    def register_adc_button(self, pin, min_val, max_val, pullup, callback):
        adc_buttons = self.adc_buttons.get(pin)
        if adc_buttons is None:
            self.adc_buttons[pin] = adc_buttons = MCU_ADC_buttons(
                self.printer, pin, pullup
            )
        adc_buttons.setup_button(min_val, max_val, callback)

    def register_debounce_button(self, pin, callback, config):
        debounce = DebounceButton(config, callback)
        return self.register_buttons([pin], debounce.button_handler)

    def register_debounce_adc_button(
        self, pin, min_val, max_val, pullup, callback, config
    ):
        debounce = DebounceButton(config, callback)
        return self.register_adc_button(
            pin, min_val, max_val, pullup, debounce.button_handler
        )

    def register_adc_button_push(self, pin, min_val, max_val, pullup, callback):
        def helper(eventtime, state, callback=callback):
            if state:
                callback(eventtime)

        self.register_adc_button(pin, min_val, max_val, pullup, helper)

    def register_buttons(self, pins, callback):
        # Parse pins
        ppins = self.printer.lookup_object("pins")
        mcu = mcu_name = None
        pin_params_list = []
        for pin in pins:
            pin_params = ppins.lookup_pin(pin, can_invert=True, can_pullup=True)
            if mcu is not None and pin_params["chip"] != mcu:
                raise ppins.error("button pins must be on same mcu")
            mcu = pin_params["chip"]
            mcu_name = pin_params["chip_name"]
            pin_params_list.append(pin_params)
        # Register pins and callback with the appropriate MCU
        mcu_buttons = self.mcu_buttons.get(mcu_name)
        if mcu_buttons is None or len(mcu_buttons.pin_list) + len(pin_params_list) > 8:
            self.mcu_buttons[mcu_name] = mcu_buttons = MCU_buttons(self.printer, mcu)
        mcu_buttons.setup_buttons(pin_params_list, callback)

    def register_rotary_encoder(
        self, pin1, pin2, cw_callback, ccw_callback, steps_per_detent
    ):
        if steps_per_detent == 2:
            re = HalfStepRotaryEncoder(cw_callback, ccw_callback)
        elif steps_per_detent == 4:
            re = FullStepRotaryEncoder(cw_callback, ccw_callback)
        else:
            raise self.printer.config_error(
                "%d steps per detent not supported" % steps_per_detent
            )
        self.register_buttons([pin1, pin2], re.encoder_callback)

    def register_button_push(self, pin, callback):
        def helper(eventtime, state, callback=callback):
            if state:
                callback(eventtime)

        self.register_buttons([pin], helper)


def load_config(config):
    return PrinterButtons(config)