# File descriptor and timer event helper # # Copyright (C) 2016-2020 Kevin O'Connor # # This file may be distributed under the terms of the GNU GPLv3 license. import gc import logging import math import os import queue import select import time import chelper import greenlet import util _NOW = 0.0 _NEVER = 9999999999999999.0 class ReactorTimer: def __init__(self, callback, waketime): self.callback = callback self.waketime = waketime class ReactorCompletion: class sentinel: pass def __init__(self, reactor): self.reactor = reactor self.result = self.sentinel self.waiting = [] def test(self): return self.result is not self.sentinel def complete(self, result): self.result = result for wait in self.waiting: self.reactor.update_timer(wait.timer, self.reactor.NOW) def wait(self, waketime=_NEVER, waketime_result=None): if self.result is self.sentinel: wait = greenlet.getcurrent() self.waiting.append(wait) self.reactor.pause(waketime) self.waiting.remove(wait) if self.result is self.sentinel: return waketime_result return self.result class ReactorCallback: def __init__(self, reactor, callback, waketime): self.reactor = reactor self.timer = reactor.register_timer(self.invoke, waketime) self.callback = callback self.completion = ReactorCompletion(reactor) def invoke(self, eventtime): self.reactor.unregister_timer(self.timer) res = self.callback(eventtime) self.completion.complete(res) return self.reactor.NEVER class ReactorFileHandler: def __init__(self, fd, read_callback, write_callback): self.fd = fd self.read_callback = read_callback self.write_callback = write_callback def fileno(self): return self.fd class ReactorGreenlet(greenlet.greenlet): def __init__(self, run): greenlet.greenlet.__init__(self, run=run) self.timer = None class ReactorMutex: def __init__(self, reactor, is_locked): self.reactor = reactor self.is_locked = is_locked self.next_pending = False self.queue = [] self.lock = self.__enter__ self.unlock = self.__exit__ def test(self): return self.is_locked def __enter__(self): if not self.is_locked: self.is_locked = True return g = greenlet.getcurrent() self.queue.append(g) while 1: self.reactor.pause(self.reactor.NEVER) if self.next_pending and self.queue[0] is g: self.next_pending = False self.queue.pop(0) return def __exit__(self, type=None, value=None, tb=None): if not self.queue: self.is_locked = False return self.next_pending = True self.reactor.update_timer(self.queue[0].timer, self.reactor.NOW) class SelectReactor: NOW = _NOW NEVER = _NEVER def __init__(self, gc_checking=False): # Main code self._process = False self.monotonic = chelper.get_ffi()[1].get_monotonic # Python garbage collection self._check_gc = gc_checking self._last_gc_times = [0.0, 0.0, 0.0] # Timers self._timers = [] self._next_timer = self.NEVER # Callbacks self._pipe_fds = None self._async_queue = queue.Queue() # File descriptors self._read_fds = [] self._write_fds = [] # Greenlets self._g_dispatch = None self._greenlets = [] self._all_greenlets = [] def get_gc_stats(self): return tuple(self._last_gc_times) # Timers def update_timer(self, timer_handler, waketime): timer_handler.waketime = waketime self._next_timer = min(self._next_timer, waketime) def register_timer(self, callback, waketime=NEVER): timer_handler = ReactorTimer(callback, waketime) timers = list(self._timers) timers.append(timer_handler) self._timers = timers self._next_timer = min(self._next_timer, waketime) return timer_handler def unregister_timer(self, timer_handler): timer_handler.waketime = self.NEVER timers = list(self._timers) timers.pop(timers.index(timer_handler)) self._timers = timers def _check_timers(self, eventtime, busy): if eventtime < self._next_timer: if busy: return 0.0 if self._check_gc: gi = gc.get_count() if gi[0] >= 700: # Reactor looks idle and gc is due - run it gc_level = 0 if gi[1] >= 10: gc_level = 1 if gi[2] >= 10: gc_level = 2 self._last_gc_times[gc_level] = eventtime gc.collect(gc_level) return 0.0 return min(1.0, max(0.001, self._next_timer - eventtime)) self._next_timer = self.NEVER g_dispatch = self._g_dispatch for t in self._timers: waketime = t.waketime if eventtime >= waketime: t.waketime = self.NEVER t.waketime = waketime = t.callback(eventtime) if g_dispatch is not self._g_dispatch: self._next_timer = min(self._next_timer, waketime) self._end_greenlet(g_dispatch) return 0.0 self._next_timer = min(self._next_timer, waketime) return 0.0 # Callbacks and Completions def completion(self): return ReactorCompletion(self) def register_callback(self, callback, waketime=NOW): rcb = ReactorCallback(self, callback, waketime) return rcb.completion # Asynchronous (from another thread) callbacks and completions def register_async_callback(self, callback, waketime=NOW): self._async_queue.put_nowait((ReactorCallback, (self, callback, waketime))) try: os.write(self._pipe_fds[1], b".") except os.error: pass def async_complete(self, completion, result): self._async_queue.put_nowait((completion.complete, (result,))) try: os.write(self._pipe_fds[1], b".") except os.error: pass def _got_pipe_signal(self, eventtime): try: os.read(self._pipe_fds[0], 4096) except os.error: pass while 1: try: func, args = self._async_queue.get_nowait() except queue.Empty: break func(*args) def _setup_async_callbacks(self): self._pipe_fds = os.pipe() util.set_nonblock(self._pipe_fds[0]) util.set_nonblock(self._pipe_fds[1]) self.register_fd(self._pipe_fds[0], self._got_pipe_signal) # Greenlets def _sys_pause(self, waketime): # Pause using system sleep for when reactor not running delay = waketime - self.monotonic() if delay > 0.0: time.sleep(delay) return self.monotonic() def pause(self, waketime): g = greenlet.getcurrent() if g is not self._g_dispatch: if self._g_dispatch is None: return self._sys_pause(waketime) # Switch to _check_timers (via g.timer.callback return) return self._g_dispatch.switch(waketime) # Pausing the dispatch greenlet - prepare a new greenlet to do dispatch if self._greenlets: g_next = self._greenlets.pop() else: g_next = ReactorGreenlet(run=self._dispatch_loop) self._all_greenlets.append(g_next) g_next.parent = g.parent g.timer = self.register_timer(g.switch, waketime) self._next_timer = self.NOW # Switch to _dispatch_loop (via _end_greenlet or direct) eventtime = g_next.switch() # This greenlet activated from g.timer.callback (via _check_timers) return eventtime def _end_greenlet(self, g_old): # Cache this greenlet for later use self._greenlets.append(g_old) self.unregister_timer(g_old.timer) g_old.timer = None # Switch to _check_timers (via g_old.timer.callback return) self._g_dispatch.switch(self.NEVER) # This greenlet reactivated from pause() - return to main dispatch loop self._g_dispatch = g_old # Mutexes def mutex(self, is_locked=False): return ReactorMutex(self, is_locked) # File descriptors def register_fd(self, fd, read_callback, write_callback=None): file_handler = ReactorFileHandler(fd, read_callback, write_callback) self.set_fd_wake(file_handler, True, False) return file_handler def unregister_fd(self, file_handler): if file_handler in self._read_fds: self._read_fds.pop(self._read_fds.index(file_handler)) if file_handler in self._write_fds: self._write_fds.pop(self._write_fds.index(file_handler)) def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): if file_handler in self._read_fds: if not is_readable: self._read_fds.pop(self._read_fds.index(file_handler)) elif is_readable: self._read_fds.append(file_handler) if file_handler in self._write_fds: if not is_writeable: self._write_fds.pop(self._write_fds.index(file_handler)) elif is_writeable: self._write_fds.append(file_handler) # Main loop def _dispatch_loop(self): self._g_dispatch = g_dispatch = greenlet.getcurrent() busy = True eventtime = self.monotonic() while self._process: timeout = self._check_timers(eventtime, busy) busy = False res = select.select(self._read_fds, self.write_fds, [], timeout) eventtime = self.monotonic() for fd in res[0]: busy = True fd.read_callback(eventtime) if g_dispatch is not self._g_dispatch: self._end_greenlet(g_dispatch) eventtime = self.monotonic() break for fd in res[1]: busy = True fd.write_callback(eventtime) if g_dispatch is not self._g_dispatch: self._end_greenlet(g_dispatch) eventtime = self.monotonic() break self._g_dispatch = None def run(self): if self._pipe_fds is None: self._setup_async_callbacks() self._process = True g_next = ReactorGreenlet(run=self._dispatch_loop) self._all_greenlets.append(g_next) g_next.switch() def end(self): self._process = False def finalize(self): self._g_dispatch = None self._greenlets = [] for g in self._all_greenlets: try: g.throw() except: logging.exception("reactor finalize greenlet terminate") self._all_greenlets = [] if self._pipe_fds is not None: os.close(self._pipe_fds[0]) os.close(self._pipe_fds[1]) self._pipe_fds = None class PollReactor(SelectReactor): def __init__(self, gc_checking=False): SelectReactor.__init__(self, gc_checking) self._poll = select.poll() self._fds = {} # File descriptors def register_fd(self, fd, read_callback, write_callback=None): file_handler = ReactorFileHandler(fd, read_callback, write_callback) fds = self._fds.copy() fds[fd] = file_handler self._fds = fds self._poll.register(file_handler, select.POLLIN | select.POLLHUP) return file_handler def unregister_fd(self, file_handler): self._poll.unregister(file_handler) fds = self._fds.copy() del fds[file_handler.fd] self._fds = fds def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): flags = select.POLLHUP if is_readable: flags |= select.POLLIN if is_writeable: flags |= select.POLLOUT self._poll.modify(file_handler, flags) # Main loop def _dispatch_loop(self): self._g_dispatch = g_dispatch = greenlet.getcurrent() busy = True eventtime = self.monotonic() while self._process: timeout = self._check_timers(eventtime, busy) busy = False res = self._poll.poll(int(math.ceil(timeout * 1000.0))) eventtime = self.monotonic() for fd, event in res: busy = True if event & (select.POLLIN | select.POLLHUP): self._fds[fd].read_callback(eventtime) if g_dispatch is not self._g_dispatch: self._end_greenlet(g_dispatch) eventtime = self.monotonic() break if event & select.POLLOUT: self._fds[fd].write_callback(eventtime) if g_dispatch is not self._g_dispatch: self._end_greenlet(g_dispatch) eventtime = self.monotonic() break self._g_dispatch = None class EPollReactor(SelectReactor): def __init__(self, gc_checking=False): SelectReactor.__init__(self, gc_checking) self._epoll = select.epoll() self._fds = {} # File descriptors def register_fd(self, fd, read_callback, write_callback=None): file_handler = ReactorFileHandler(fd, read_callback, write_callback) fds = self._fds.copy() fds[fd] = read_callback self._fds = fds self._epoll.register(fd, select.EPOLLIN | select.EPOLLHUP) return file_handler def unregister_fd(self, file_handler): self._epoll.unregister(file_handler.fd) fds = self._fds.copy() del fds[file_handler.fd] self._fds = fds def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): flags = select.POLLHUP if is_readable: flags |= select.EPOLLIN if is_writeable: flags |= select.EPOLLOUT self._epoll.modify(file_handler, flags) # Main loop def _dispatch_loop(self): self._g_dispatch = g_dispatch = greenlet.getcurrent() busy = True eventtime = self.monotonic() while self._process: timeout = self._check_timers(eventtime, busy) busy = False res = self._epoll.poll(timeout) eventtime = self.monotonic() for fd, event in res: busy = True if event & (select.EPOLLIN | select.EPOLLHUP): self._fds[fd].read_callback(eventtime) if g_dispatch is not self._g_dispatch: self._end_greenlet(g_dispatch) eventtime = self.monotonic() break if event & select.EPOLLOUT: self._fds[fd].write_callback(eventtime) if g_dispatch is not self._g_dispatch: self._end_greenlet(g_dispatch) eventtime = self.monotonic() break self._g_dispatch = None # Use the poll based reactor if it is available try: select.poll Reactor = PollReactor except: Reactor = SelectReactor