diff options
Diffstat (limited to 'klippy/reactor.py')
-rw-r--r-- | klippy/reactor.py | 80 |
1 files changed, 65 insertions, 15 deletions
diff --git a/klippy/reactor.py b/klippy/reactor.py index 412d53ed..c5979b40 100644 --- a/klippy/reactor.py +++ b/klippy/reactor.py @@ -7,26 +7,33 @@ import os, gc, select, math, time, logging, queue import greenlet import chelper, util -_NOW = 0. -_NEVER = 9999999999999999. +_NOW = 0.0 +_NEVER = 9999999999999999.0 + class ReactorTimer: def __init__(self, callback, waketime): self.callback = callback self.waketime = waketime + class ReactorCompletion: - class sentinel: pass + class sentinel: + pass + def __init__(self, reactor): self.reactor = reactor self.result = self.sentinel self.waiting = [] + def test(self): return self.result is not self.sentinel + def complete(self, result): self.result = result for wait in self.waiting: self.reactor.update_timer(wait.timer, self.reactor.NOW) + def wait(self, waketime=_NEVER, waketime_result=None): if self.result is self.sentinel: wait = greenlet.getcurrent() @@ -37,31 +44,37 @@ class ReactorCompletion: return waketime_result return self.result + class ReactorCallback: def __init__(self, reactor, callback, waketime): self.reactor = reactor self.timer = reactor.register_timer(self.invoke, waketime) self.callback = callback self.completion = ReactorCompletion(reactor) + def invoke(self, eventtime): self.reactor.unregister_timer(self.timer) res = self.callback(eventtime) self.completion.complete(res) return self.reactor.NEVER + class ReactorFileHandler: def __init__(self, fd, read_callback, write_callback): self.fd = fd self.read_callback = read_callback self.write_callback = write_callback + def fileno(self): return self.fd + class ReactorGreenlet(greenlet.greenlet): def __init__(self, run): greenlet.greenlet.__init__(self, run=run) self.timer = None + class ReactorMutex: def __init__(self, reactor, is_locked): self.reactor = reactor @@ -70,8 +83,10 @@ class ReactorMutex: self.queue = [] self.lock = self.__enter__ self.unlock = self.__exit__ + def test(self): return self.is_locked + def __enter__(self): if not self.is_locked: self.is_locked = True @@ -84,6 +99,7 @@ class ReactorMutex: self.next_pending = False self.queue.pop(0) return + def __exit__(self, type=None, value=None, tb=None): if not self.queue: self.is_locked = False @@ -91,16 +107,18 @@ class ReactorMutex: self.next_pending = True self.reactor.update_timer(self.queue[0].timer, self.reactor.NOW) + class SelectReactor: NOW = _NOW NEVER = _NEVER + def __init__(self, gc_checking=False): # Main code self._process = False self.monotonic = chelper.get_ffi()[1].get_monotonic # Python garbage collection self._check_gc = gc_checking - self._last_gc_times = [0., 0., 0.] + self._last_gc_times = [0.0, 0.0, 0.0] # Timers self._timers = [] self._next_timer = self.NEVER @@ -114,12 +132,15 @@ class SelectReactor: self._g_dispatch = None self._greenlets = [] self._all_greenlets = [] + def get_gc_stats(self): return tuple(self._last_gc_times) + # Timers def update_timer(self, timer_handler, waketime): timer_handler.waketime = waketime self._next_timer = min(self._next_timer, waketime) + def register_timer(self, callback, waketime=NEVER): timer_handler = ReactorTimer(callback, waketime) timers = list(self._timers) @@ -127,15 +148,17 @@ class SelectReactor: self._timers = timers self._next_timer = min(self._next_timer, waketime) return timer_handler + def unregister_timer(self, timer_handler): timer_handler.waketime = self.NEVER timers = list(self._timers) timers.pop(timers.index(timer_handler)) self._timers = timers + def _check_timers(self, eventtime, busy): if eventtime < self._next_timer: if busy: - return 0. + return 0.0 if self._check_gc: gi = gc.get_count() if gi[0] >= 700: @@ -147,8 +170,8 @@ class SelectReactor: gc_level = 2 self._last_gc_times[gc_level] = eventtime gc.collect(gc_level) - return 0. - return min(1., max(.001, self._next_timer - eventtime)) + return 0.0 + return min(1.0, max(0.001, self._next_timer - eventtime)) self._next_timer = self.NEVER g_dispatch = self._g_dispatch for t in self._timers: @@ -159,29 +182,33 @@ class SelectReactor: if g_dispatch is not self._g_dispatch: self._next_timer = min(self._next_timer, waketime) self._end_greenlet(g_dispatch) - return 0. + return 0.0 self._next_timer = min(self._next_timer, waketime) - return 0. + return 0.0 + # Callbacks and Completions def completion(self): return ReactorCompletion(self) + def register_callback(self, callback, waketime=NOW): rcb = ReactorCallback(self, callback, waketime) return rcb.completion + # Asynchronous (from another thread) callbacks and completions def register_async_callback(self, callback, waketime=NOW): - self._async_queue.put_nowait( - (ReactorCallback, (self, callback, waketime))) + self._async_queue.put_nowait((ReactorCallback, (self, callback, waketime))) try: - os.write(self._pipe_fds[1], b'.') + os.write(self._pipe_fds[1], b".") except os.error: pass + def async_complete(self, completion, result): self._async_queue.put_nowait((completion.complete, (result,))) try: - os.write(self._pipe_fds[1], b'.') + os.write(self._pipe_fds[1], b".") except os.error: pass + def _got_pipe_signal(self, eventtime): try: os.read(self._pipe_fds[0], 4096) @@ -193,18 +220,21 @@ class SelectReactor: except queue.Empty: break func(*args) + def _setup_async_callbacks(self): self._pipe_fds = os.pipe() util.set_nonblock(self._pipe_fds[0]) util.set_nonblock(self._pipe_fds[1]) self.register_fd(self._pipe_fds[0], self._got_pipe_signal) + # Greenlets def _sys_pause(self, waketime): # Pause using system sleep for when reactor not running delay = waketime - self.monotonic() - if delay > 0.: + if delay > 0.0: time.sleep(delay) return self.monotonic() + def pause(self, waketime): g = greenlet.getcurrent() if g is not self._g_dispatch: @@ -225,6 +255,7 @@ class SelectReactor: eventtime = g_next.switch() # This greenlet activated from g.timer.callback (via _check_timers) return eventtime + def _end_greenlet(self, g_old): # Cache this greenlet for later use self._greenlets.append(g_old) @@ -234,19 +265,23 @@ class SelectReactor: self._g_dispatch.switch(self.NEVER) # This greenlet reactivated from pause() - return to main dispatch loop self._g_dispatch = g_old + # Mutexes def mutex(self, is_locked=False): return ReactorMutex(self, is_locked) + # File descriptors def register_fd(self, fd, read_callback, write_callback=None): file_handler = ReactorFileHandler(fd, read_callback, write_callback) self.set_fd_wake(file_handler, True, False) return file_handler + def unregister_fd(self, file_handler): if file_handler in self._read_fds: self._read_fds.pop(self._read_fds.index(file_handler)) if file_handler in self._write_fds: self._write_fds.pop(self._write_fds.index(file_handler)) + def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): if file_handler in self._read_fds: if not is_readable: @@ -258,6 +293,7 @@ class SelectReactor: self._write_fds.pop(self._write_fds.index(file_handler)) elif is_writeable: self._write_fds.append(file_handler) + # Main loop def _dispatch_loop(self): self._g_dispatch = g_dispatch = greenlet.getcurrent() @@ -283,6 +319,7 @@ class SelectReactor: eventtime = self.monotonic() break self._g_dispatch = None + def run(self): if self._pipe_fds is None: self._setup_async_callbacks() @@ -290,8 +327,10 @@ class SelectReactor: g_next = ReactorGreenlet(run=self._dispatch_loop) self._all_greenlets.append(g_next) g_next.switch() + def end(self): self._process = False + def finalize(self): self._g_dispatch = None self._greenlets = [] @@ -306,11 +345,13 @@ class SelectReactor: os.close(self._pipe_fds[1]) self._pipe_fds = None + class PollReactor(SelectReactor): def __init__(self, gc_checking=False): SelectReactor.__init__(self, gc_checking) self._poll = select.poll() self._fds = {} + # File descriptors def register_fd(self, fd, read_callback, write_callback=None): file_handler = ReactorFileHandler(fd, read_callback, write_callback) @@ -319,11 +360,13 @@ class PollReactor(SelectReactor): self._fds = fds self._poll.register(file_handler, select.POLLIN | select.POLLHUP) return file_handler + def unregister_fd(self, file_handler): self._poll.unregister(file_handler) fds = self._fds.copy() del fds[file_handler.fd] self._fds = fds + def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): flags = select.POLLHUP if is_readable: @@ -331,6 +374,7 @@ class PollReactor(SelectReactor): if is_writeable: flags |= select.POLLOUT self._poll.modify(file_handler, flags) + # Main loop def _dispatch_loop(self): self._g_dispatch = g_dispatch = greenlet.getcurrent() @@ -339,7 +383,7 @@ class PollReactor(SelectReactor): while self._process: timeout = self._check_timers(eventtime, busy) busy = False - res = self._poll.poll(int(math.ceil(timeout * 1000.))) + res = self._poll.poll(int(math.ceil(timeout * 1000.0))) eventtime = self.monotonic() for fd, event in res: busy = True @@ -357,11 +401,13 @@ class PollReactor(SelectReactor): break self._g_dispatch = None + class EPollReactor(SelectReactor): def __init__(self, gc_checking=False): SelectReactor.__init__(self, gc_checking) self._epoll = select.epoll() self._fds = {} + # File descriptors def register_fd(self, fd, read_callback, write_callback=None): file_handler = ReactorFileHandler(fd, read_callback, write_callback) @@ -370,11 +416,13 @@ class EPollReactor(SelectReactor): self._fds = fds self._epoll.register(fd, select.EPOLLIN | select.EPOLLHUP) return file_handler + def unregister_fd(self, file_handler): self._epoll.unregister(file_handler.fd) fds = self._fds.copy() del fds[file_handler.fd] self._fds = fds + def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): flags = select.POLLHUP if is_readable: @@ -382,6 +430,7 @@ class EPollReactor(SelectReactor): if is_writeable: flags |= select.EPOLLOUT self._epoll.modify(file_handler, flags) + # Main loop def _dispatch_loop(self): self._g_dispatch = g_dispatch = greenlet.getcurrent() @@ -408,6 +457,7 @@ class EPollReactor(SelectReactor): break self._g_dispatch = None + # Use the poll based reactor if it is available try: select.poll |