aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/reactor.py
diff options
context:
space:
mode:
authorKevin O'Connor <kevin@koconnor.net>2018-06-26 09:24:19 -0400
committerKevin O'Connor <kevin@koconnor.net>2018-06-30 14:13:47 -0400
commit4061026c2548ff84873b06a9943d9d6ea3e2ddee (patch)
tree669b5bd4cde4de632341869f558aede6209be458 /klippy/reactor.py
parentecf53e6194a7ccdc7015faa79ad3f74f6f7a5d1a (diff)
downloadkutter-4061026c2548ff84873b06a9943d9d6ea3e2ddee.tar.gz
kutter-4061026c2548ff84873b06a9943d9d6ea3e2ddee.tar.xz
kutter-4061026c2548ff84873b06a9943d9d6ea3e2ddee.zip
reactor: Add ability to register callbacks
Add the ability to register callbacks - both asynchronous (ie, from another thread) and synchronous. Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
Diffstat (limited to 'klippy/reactor.py')
-rw-r--r--klippy/reactor.py64
1 files changed, 58 insertions, 6 deletions
diff --git a/klippy/reactor.py b/klippy/reactor.py
index 9bce6a9b..76e7d65e 100644
--- a/klippy/reactor.py
+++ b/klippy/reactor.py
@@ -1,17 +1,27 @@
# File descriptor and timer event helper
#
-# Copyright (C) 2016,2017 Kevin O'Connor <kevin@koconnor.net>
+# Copyright (C) 2016-2018 Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
-import select, math, time
+import os, select, math, time, Queue
import greenlet
-import chelper
+import chelper, util
class ReactorTimer:
def __init__(self, callback, waketime):
self.callback = callback
self.waketime = waketime
+class ReactorCallback:
+ def __init__(self, reactor, callback):
+ self.reactor = reactor
+ self.timer = reactor.register_timer(self.invoke, reactor.NOW)
+ self.callback = callback
+ def invoke(self, eventtime):
+ self.reactor.unregister_timer(self.timer)
+ self.callback(eventtime)
+ return self.reactor.NEVER
+
class ReactorFileHandler:
def __init__(self, fd, callback):
self.fd = fd
@@ -28,13 +38,20 @@ class SelectReactor:
NOW = 0.
NEVER = 9999999999999999.
def __init__(self):
- self._fds = []
+ # Main code
+ self._process = False
+ self.monotonic = chelper.get_ffi()[1].get_monotonic
+ # Timers
self._timers = []
self._next_timer = self.NEVER
- self._process = False
+ # Callbacks
+ self._pipe_fds = None
+ self._async_queue = Queue.Queue()
+ # File descriptors
+ self._fds = []
+ # Greenlets
self._g_dispatch = None
self._greenlets = []
- self.monotonic = chelper.get_ffi()[1].get_monotonic
# Timers
def _note_time(self, t):
nexttime = t.waketime
@@ -70,6 +87,36 @@ class SelectReactor:
if eventtime >= self._next_timer:
return 0.
return min(1., max(.001, self._next_timer - self.monotonic()))
+ # Callbacks
+ def register_callback(self, callback):
+ ReactorCallback(self, callback)
+ def register_async_callback(self, callback):
+ self._async_queue.put_nowait(callback)
+ try:
+ os.write(self._pipe_fds[1], '.')
+ except os.error:
+ pass
+ def _got_pipe_signal(self, eventtime):
+ try:
+ os.read(self._pipe_fds[0], 4096)
+ except os.error:
+ pass
+ while 1:
+ try:
+ callback = self._async_queue.get_nowait()
+ except Queue.Empty:
+ break
+ ReactorCallback(self, callback)
+ def _setup_async_callbacks(self):
+ self._pipe_fds = os.pipe()
+ util.set_nonblock(self._pipe_fds[0])
+ util.set_nonblock(self._pipe_fds[1])
+ self.register_fd(self._pipe_fds[0], self._got_pipe_signal)
+ def __del__(self):
+ if self._pipe_fds is not None:
+ os.close(self._pipe_fds[0])
+ os.close(self._pipe_fds[1])
+ self._pipe_fds = None
# Greenlets
def _sys_pause(self, waketime):
# Pause using system sleep for when reactor not running
@@ -91,10 +138,13 @@ class SelectReactor:
g.timer = self.register_timer(g.switch, waketime)
return g_next.switch()
def _end_greenlet(self, g_old):
+ # Cache this greenlet for later use
self._greenlets.append(g_old)
self.unregister_timer(g_old.timer)
g_old.timer = None
+ # Switch to existing dispatch
self._g_dispatch.switch(self.NEVER)
+ # This greenlet was reactivated - prepare for main processing loop
self._g_dispatch = g_old
# File descriptors
def register_fd(self, fd, callback):
@@ -119,6 +169,8 @@ class SelectReactor:
break
self._g_dispatch = None
def run(self):
+ if self._pipe_fds is None:
+ self._setup_async_callbacks()
self._process = True
g_next = ReactorGreenlet(run=self._dispatch_loop)
g_next.switch()