aboutsummaryrefslogtreecommitdiffstats
path: root/klippy
diff options
context:
space:
mode:
authorKevin O'Connor <kevin@koconnor.net>2021-02-11 20:43:36 -0500
committerKevin O'Connor <kevin@koconnor.net>2021-06-09 18:58:35 -0400
commit2559a2dd5ad4c5e8341aeddb6e5a59967867cbd7 (patch)
tree28f9613ee76b90d21d0c31a8928ce05978cfa167 /klippy
parent05c2d51a1239a13a7dfeec9cfbb1ea38b7b32aa0 (diff)
downloadkutter-2559a2dd5ad4c5e8341aeddb6e5a59967867cbd7.tar.gz
kutter-2559a2dd5ad4c5e8341aeddb6e5a59967867cbd7.tar.xz
kutter-2559a2dd5ad4c5e8341aeddb6e5a59967867cbd7.zip
pollreactor: Move C pollreactor code from serialqueue.c to its own file
Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
Diffstat (limited to 'klippy')
-rw-r--r--klippy/chelper/__init__.py3
-rw-r--r--klippy/chelper/pollreactor.c179
-rw-r--r--klippy/chelper/pollreactor.h20
-rw-r--r--klippy/chelper/serialqueue.c225
4 files changed, 226 insertions, 201 deletions
diff --git a/klippy/chelper/__init__.py b/klippy/chelper/__init__.py
index 4d199126..07ee190d 100644
--- a/klippy/chelper/__init__.py
+++ b/klippy/chelper/__init__.py
@@ -18,6 +18,7 @@ COMPILE_ARGS = ("-Wall -g -O2 -shared -fPIC"
SSE_FLAGS = "-mfpmath=sse -msse2"
SOURCE_FILES = [
'pyhelper.c', 'serialqueue.c', 'stepcompress.c', 'itersolve.c', 'trapq.c',
+ 'pollreactor.c',
'kin_cartesian.c', 'kin_corexy.c', 'kin_corexz.c', 'kin_delta.c',
'kin_polar.c', 'kin_rotary_delta.c', 'kin_winch.c', 'kin_extruder.c',
'kin_shaper.c',
@@ -25,7 +26,7 @@ SOURCE_FILES = [
DEST_LIB = "c_helper.so"
OTHER_FILES = [
'list.h', 'serialqueue.h', 'stepcompress.h', 'itersolve.h', 'pyhelper.h',
- 'trapq.h',
+ 'trapq.h', 'pollreactor.h',
]
defs_stepcompress = """
diff --git a/klippy/chelper/pollreactor.c b/klippy/chelper/pollreactor.c
new file mode 100644
index 00000000..eb12e5a8
--- /dev/null
+++ b/klippy/chelper/pollreactor.c
@@ -0,0 +1,179 @@
+// Code for dispatching timer and file descriptor events
+//
+// Copyright (C) 2016-2021 Kevin O'Connor <kevin@koconnor.net>
+//
+// This file may be distributed under the terms of the GNU GPLv3 license.
+
+#include <fcntl.h> // fcntl
+#include <math.h> // ceil
+#include <poll.h> // poll
+#include <stdlib.h> // malloc
+#include <string.h> // memset
+#include "pollreactor.h" // pollreactor_alloc
+#include "pyhelper.h" // report_errno
+
+struct pollreactor_timer {
+ double waketime;
+ double (*callback)(void *data, double eventtime);
+};
+
+struct pollreactor {
+ int num_fds, num_timers, must_exit;
+ void *callback_data;
+ double next_timer;
+ struct pollfd *fds;
+ void (**fd_callbacks)(void *data, double eventtime);
+ struct pollreactor_timer *timers;
+};
+
+// Allocate a new 'struct pollreactor' object
+struct pollreactor *
+pollreactor_alloc(int num_fds, int num_timers, void *callback_data)
+{
+ struct pollreactor *pr = malloc(sizeof(*pr));
+ memset(pr, 0, sizeof(*pr));
+ pr->num_fds = num_fds;
+ pr->num_timers = num_timers;
+ pr->must_exit = 0;
+ pr->callback_data = callback_data;
+ pr->next_timer = PR_NEVER;
+ pr->fds = malloc(num_fds * sizeof(*pr->fds));
+ memset(pr->fds, 0, num_fds * sizeof(*pr->fds));
+ pr->fd_callbacks = malloc(num_fds * sizeof(*pr->fd_callbacks));
+ memset(pr->fd_callbacks, 0, num_fds * sizeof(*pr->fd_callbacks));
+ pr->timers = malloc(num_timers * sizeof(*pr->timers));
+ memset(pr->timers, 0, num_timers * sizeof(*pr->timers));
+ int i;
+ for (i=0; i<num_timers; i++)
+ pr->timers[i].waketime = PR_NEVER;
+ return pr;
+}
+
+// Free resources associated with a 'struct pollreactor' object
+void
+pollreactor_free(struct pollreactor *pr)
+{
+ free(pr->fds);
+ pr->fds = NULL;
+ free(pr->fd_callbacks);
+ pr->fd_callbacks = NULL;
+ free(pr->timers);
+ pr->timers = NULL;
+ free(pr);
+}
+
+// Add a callback for when a file descriptor (fd) becomes readable
+void
+pollreactor_add_fd(struct pollreactor *pr, int pos, int fd, void *callback
+ , int write_only)
+{
+ pr->fds[pos].fd = fd;
+ pr->fds[pos].events = POLLHUP | (write_only ? 0 : POLLIN);
+ pr->fds[pos].revents = 0;
+ pr->fd_callbacks[pos] = callback;
+}
+
+// Add a timer callback
+void
+pollreactor_add_timer(struct pollreactor *pr, int pos, void *callback)
+{
+ pr->timers[pos].callback = callback;
+ pr->timers[pos].waketime = PR_NEVER;
+}
+
+// Return the last schedule wake-up time for a timer
+double
+pollreactor_get_timer(struct pollreactor *pr, int pos)
+{
+ return pr->timers[pos].waketime;
+}
+
+// Set the wake-up time for a given timer
+void
+pollreactor_update_timer(struct pollreactor *pr, int pos, double waketime)
+{
+ pr->timers[pos].waketime = waketime;
+ if (waketime < pr->next_timer)
+ pr->next_timer = waketime;
+}
+
+// Internal code to invoke timer callbacks
+static int
+pollreactor_check_timers(struct pollreactor *pr, double eventtime, int busy)
+{
+ if (eventtime >= pr->next_timer) {
+ // Find and run pending timers
+ pr->next_timer = PR_NEVER;
+ int i;
+ for (i=0; i<pr->num_timers; i++) {
+ struct pollreactor_timer *timer = &pr->timers[i];
+ double t = timer->waketime;
+ if (eventtime >= t) {
+ busy = 1;
+ t = timer->callback(pr->callback_data, eventtime);
+ timer->waketime = t;
+ }
+ if (t < pr->next_timer)
+ pr->next_timer = t;
+ }
+ }
+ if (busy)
+ return 0;
+ // Calculate sleep duration
+ double timeout = ceil((pr->next_timer - eventtime) * 1000.);
+ return timeout < 1. ? 1 : (timeout > 1000. ? 1000 : (int)timeout);
+}
+
+// Repeatedly check for timer and fd events and invoke their callbacks
+void
+pollreactor_run(struct pollreactor *pr)
+{
+ double eventtime = get_monotonic();
+ int busy = 1;
+ while (! pr->must_exit) {
+ int timeout = pollreactor_check_timers(pr, eventtime, busy);
+ busy = 0;
+ int ret = poll(pr->fds, pr->num_fds, timeout);
+ eventtime = get_monotonic();
+ if (ret > 0) {
+ busy = 1;
+ int i;
+ for (i=0; i<pr->num_fds; i++)
+ if (pr->fds[i].revents)
+ pr->fd_callbacks[i](pr->callback_data, eventtime);
+ } else if (ret < 0) {
+ report_errno("poll", ret);
+ pr->must_exit = 1;
+ }
+ }
+}
+
+// Request that a currently running pollreactor_run() loop exit
+void
+pollreactor_do_exit(struct pollreactor *pr)
+{
+ pr->must_exit = 1;
+}
+
+// Check if a pollreactor_run() loop has been requested to exit
+int
+pollreactor_is_exit(struct pollreactor *pr)
+{
+ return pr->must_exit;
+}
+
+int
+fd_set_non_blocking(int fd)
+{
+ int flags = fcntl(fd, F_GETFL);
+ if (flags < 0) {
+ report_errno("fcntl getfl", flags);
+ return -1;
+ }
+ int ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ if (ret < 0) {
+ report_errno("fcntl setfl", flags);
+ return -1;
+ }
+ return 0;
+}
diff --git a/klippy/chelper/pollreactor.h b/klippy/chelper/pollreactor.h
new file mode 100644
index 00000000..97d7d3a9
--- /dev/null
+++ b/klippy/chelper/pollreactor.h
@@ -0,0 +1,20 @@
+#ifndef POLLREACTOR_H
+#define POLLREACTOR_H
+
+#define PR_NOW 0.
+#define PR_NEVER 9999999999999999.
+
+struct pollreactor *pollreactor_alloc(int num_fds, int num_timers
+ , void *callback_data);
+void pollreactor_free(struct pollreactor *pr);
+void pollreactor_add_fd(struct pollreactor *pr, int pos, int fd, void *callback
+ , int write_only);
+void pollreactor_add_timer(struct pollreactor *pr, int pos, void *callback);
+double pollreactor_get_timer(struct pollreactor *pr, int pos);
+void pollreactor_update_timer(struct pollreactor *pr, int pos, double waketime);
+void pollreactor_run(struct pollreactor *pr);
+void pollreactor_do_exit(struct pollreactor *pr);
+int pollreactor_is_exit(struct pollreactor *pr);
+int fd_set_non_blocking(int fd);
+
+#endif // pollreactor.h
diff --git a/klippy/chelper/serialqueue.c b/klippy/chelper/serialqueue.c
index 77d547bb..38f65199 100644
--- a/klippy/chelper/serialqueue.c
+++ b/klippy/chelper/serialqueue.c
@@ -12,10 +12,8 @@
// clock times, prioritizes commands, and handles retransmissions. A
// background thread is launched to do this work and minimize latency.
-#include <fcntl.h> // fcntl
#include <linux/can.h> // // struct can_frame
-#include <math.h> // ceil
-#include <poll.h> // poll
+#include <math.h> // fabs
#include <pthread.h> // pthread_mutex_lock
#include <stddef.h> // offsetof
#include <stdint.h> // uint64_t
@@ -26,185 +24,12 @@
#include <unistd.h> // pipe
#include "compiler.h" // __visible
#include "list.h" // list_add_tail
+#include "pollreactor.h" // pollreactor_alloc
#include "pyhelper.h" // get_monotonic
#include "serialqueue.h" // struct queue_message
/****************************************************************
- * Poll reactor
- ****************************************************************/
-
-// The 'poll reactor' code is a mechanism for dispatching timer and
-// file descriptor events.
-
-#define PR_NOW 0.
-#define PR_NEVER 9999999999999999.
-
-struct pollreactor_timer {
- double waketime;
- double (*callback)(void *data, double eventtime);
-};
-
-struct pollreactor {
- int num_fds, num_timers, must_exit;
- void *callback_data;
- double next_timer;
- struct pollfd *fds;
- void (**fd_callbacks)(void *data, double eventtime);
- struct pollreactor_timer *timers;
-};
-
-// Allocate a new 'struct pollreactor' object
-static void
-pollreactor_setup(struct pollreactor *pr, int num_fds, int num_timers
- , void *callback_data)
-{
- pr->num_fds = num_fds;
- pr->num_timers = num_timers;
- pr->must_exit = 0;
- pr->callback_data = callback_data;
- pr->next_timer = PR_NEVER;
- pr->fds = malloc(num_fds * sizeof(*pr->fds));
- memset(pr->fds, 0, num_fds * sizeof(*pr->fds));
- pr->fd_callbacks = malloc(num_fds * sizeof(*pr->fd_callbacks));
- memset(pr->fd_callbacks, 0, num_fds * sizeof(*pr->fd_callbacks));
- pr->timers = malloc(num_timers * sizeof(*pr->timers));
- memset(pr->timers, 0, num_timers * sizeof(*pr->timers));
- int i;
- for (i=0; i<num_timers; i++)
- pr->timers[i].waketime = PR_NEVER;
-}
-
-// Free resources associated with a 'struct pollreactor' object
-static void
-pollreactor_free(struct pollreactor *pr)
-{
- free(pr->fds);
- pr->fds = NULL;
- free(pr->fd_callbacks);
- pr->fd_callbacks = NULL;
- free(pr->timers);
- pr->timers = NULL;
-}
-
-// Add a callback for when a file descriptor (fd) becomes readable
-static void
-pollreactor_add_fd(struct pollreactor *pr, int pos, int fd, void *callback
- , int write_only)
-{
- pr->fds[pos].fd = fd;
- pr->fds[pos].events = POLLHUP | (write_only ? 0 : POLLIN);
- pr->fds[pos].revents = 0;
- pr->fd_callbacks[pos] = callback;
-}
-
-// Add a timer callback
-static void
-pollreactor_add_timer(struct pollreactor *pr, int pos, void *callback)
-{
- pr->timers[pos].callback = callback;
- pr->timers[pos].waketime = PR_NEVER;
-}
-
-// Return the last schedule wake-up time for a timer
-static double
-pollreactor_get_timer(struct pollreactor *pr, int pos)
-{
- return pr->timers[pos].waketime;
-}
-
-// Set the wake-up time for a given timer
-static void
-pollreactor_update_timer(struct pollreactor *pr, int pos, double waketime)
-{
- pr->timers[pos].waketime = waketime;
- if (waketime < pr->next_timer)
- pr->next_timer = waketime;
-}
-
-// Internal code to invoke timer callbacks
-static int
-pollreactor_check_timers(struct pollreactor *pr, double eventtime, int busy)
-{
- if (eventtime >= pr->next_timer) {
- // Find and run pending timers
- pr->next_timer = PR_NEVER;
- int i;
- for (i=0; i<pr->num_timers; i++) {
- struct pollreactor_timer *timer = &pr->timers[i];
- double t = timer->waketime;
- if (eventtime >= t) {
- busy = 1;
- t = timer->callback(pr->callback_data, eventtime);
- timer->waketime = t;
- }
- if (t < pr->next_timer)
- pr->next_timer = t;
- }
- }
- if (busy)
- return 0;
- // Calculate sleep duration
- double timeout = ceil((pr->next_timer - eventtime) * 1000.);
- return timeout < 1. ? 1 : (timeout > 1000. ? 1000 : (int)timeout);
-}
-
-// Repeatedly check for timer and fd events and invoke their callbacks
-static void
-pollreactor_run(struct pollreactor *pr)
-{
- double eventtime = get_monotonic();
- int busy = 1;
- while (! pr->must_exit) {
- int timeout = pollreactor_check_timers(pr, eventtime, busy);
- busy = 0;
- int ret = poll(pr->fds, pr->num_fds, timeout);
- eventtime = get_monotonic();
- if (ret > 0) {
- busy = 1;
- int i;
- for (i=0; i<pr->num_fds; i++)
- if (pr->fds[i].revents)
- pr->fd_callbacks[i](pr->callback_data, eventtime);
- } else if (ret < 0) {
- report_errno("poll", ret);
- pr->must_exit = 1;
- }
- }
-}
-
-// Request that a currently running pollreactor_run() loop exit
-static void
-pollreactor_do_exit(struct pollreactor *pr)
-{
- pr->must_exit = 1;
-}
-
-// Check if a pollreactor_run() loop has been requested to exit
-static int
-pollreactor_is_exit(struct pollreactor *pr)
-{
- return pr->must_exit;
-}
-
-static int
-set_non_blocking(int fd)
-{
- int flags = fcntl(fd, F_GETFL);
- if (flags < 0) {
- report_errno("fcntl getfl", flags);
- return -1;
- }
- int ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
- if (ret < 0) {
- report_errno("fcntl setfl", flags);
- return -1;
- }
- return 0;
-}
-
-
-/****************************************************************
* Serial protocol helpers
****************************************************************/
@@ -354,7 +179,7 @@ message_queue_free(struct list_head *root)
struct serialqueue {
// Input reading
- struct pollreactor pr;
+ struct pollreactor *pr;
int serial_fd, serial_fd_type, client_id;
int pipe_fds[2];
uint8_t input_buf[4096];
@@ -479,7 +304,7 @@ update_receive_seq(struct serialqueue *sq, double eventtime, uint64_t rseq)
}
}
sq->receive_seq = rseq;
- pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW);
+ pollreactor_update_timer(sq->pr, SQPT_COMMAND, PR_NOW);
// Update retransmit info
if (sq->rtt_sample_seq && rseq > sq->rtt_sample_seq
@@ -504,12 +329,12 @@ update_receive_seq(struct serialqueue *sq, double eventtime, uint64_t rseq)
sq->rtt_sample_seq = 0;
}
if (list_empty(&sq->sent_queue)) {
- pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NEVER);
+ pollreactor_update_timer(sq->pr, SQPT_RETRANSMIT, PR_NEVER);
} else {
struct queue_message *sent = list_first_entry(
&sq->sent_queue, struct queue_message, node);
double nr = eventtime + sq->rto + sent->len * sq->baud_adjust;
- pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, nr);
+ pollreactor_update_timer(sq->pr, SQPT_RETRANSMIT, nr);
}
}
@@ -554,7 +379,7 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
sq->last_ack_seq = rseq;
else if (rseq > sq->ignore_nak_seq && !list_empty(&sq->sent_queue))
// Duplicate Ack is a Nak - do fast retransmit
- pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NOW);
+ pollreactor_update_timer(sq->pr, SQPT_RETRANSMIT, PR_NOW);
} else {
// Data message - add to receive queue
struct queue_message *qm = message_fill(sq->input_buf, len);
@@ -580,7 +405,7 @@ input_event(struct serialqueue *sq, double eventtime)
int ret = read(sq->serial_fd, &cf, sizeof(cf));
if (ret <= 0) {
report_errno("can read", ret);
- pollreactor_do_exit(&sq->pr);
+ pollreactor_do_exit(sq->pr);
return;
}
if (cf.can_id != sq->client_id + 1)
@@ -595,7 +420,7 @@ input_event(struct serialqueue *sq, double eventtime)
report_errno("read", ret);
else
errorf("Got EOF when reading from device");
- pollreactor_do_exit(&sq->pr);
+ pollreactor_do_exit(sq->pr);
return;
}
sq->input_pos += ret;
@@ -635,7 +460,7 @@ kick_event(struct serialqueue *sq, double eventtime)
int ret = read(sq->pipe_fds[0], dummy, sizeof(dummy));
if (ret < 0)
report_errno("pipe read", ret);
- pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW);
+ pollreactor_update_timer(sq->pr, SQPT_COMMAND, PR_NOW);
}
static void
@@ -691,7 +516,7 @@ retransmit_event(struct serialqueue *sq, double eventtime)
sq->bytes_retransmit += buflen;
// Update rto
- if (pollreactor_get_timer(&sq->pr, SQPT_RETRANSMIT) == PR_NOW) {
+ if (pollreactor_get_timer(sq->pr, SQPT_RETRANSMIT) == PR_NOW) {
// Retransmit due to nak
sq->ignore_nak_seq = sq->receive_seq;
if (sq->receive_seq < sq->retransmit_seq)
@@ -771,7 +596,7 @@ build_and_send_command(struct serialqueue *sq, uint8_t *buf, double eventtime)
out->sent_time = eventtime;
out->receive_time = sq->idle_time;
if (list_empty(&sq->sent_queue))
- pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT
+ pollreactor_update_timer(sq->pr, SQPT_RETRANSMIT
, sq->idle_time + sq->rto);
if (!sq->rtt_sample_seq)
sq->rtt_sample_seq = sq->send_seq;
@@ -886,7 +711,7 @@ static void *
background_thread(void *data)
{
struct serialqueue *sq = data;
- pollreactor_run(&sq->pr);
+ pollreactor_run(sq->pr);
pthread_mutex_lock(&sq->lock);
check_wake_receive(sq);
@@ -910,15 +735,15 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id)
goto fail;
// Reactor setup
- pollreactor_setup(&sq->pr, SQPF_NUM, SQPT_NUM, sq);
- pollreactor_add_fd(&sq->pr, SQPF_SERIAL, serial_fd, input_event
+ sq->pr = pollreactor_alloc(SQPF_NUM, SQPT_NUM, sq);
+ pollreactor_add_fd(sq->pr, SQPF_SERIAL, serial_fd, input_event
, serial_fd_type==SQT_DEBUGFILE);
- pollreactor_add_fd(&sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event, 0);
- pollreactor_add_timer(&sq->pr, SQPT_RETRANSMIT, retransmit_event);
- pollreactor_add_timer(&sq->pr, SQPT_COMMAND, command_event);
- set_non_blocking(serial_fd);
- set_non_blocking(sq->pipe_fds[0]);
- set_non_blocking(sq->pipe_fds[1]);
+ pollreactor_add_fd(sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event, 0);
+ pollreactor_add_timer(sq->pr, SQPT_RETRANSMIT, retransmit_event);
+ pollreactor_add_timer(sq->pr, SQPT_COMMAND, command_event);
+ fd_set_non_blocking(serial_fd);
+ fd_set_non_blocking(sq->pipe_fds[0]);
+ fd_set_non_blocking(sq->pipe_fds[1]);
// Retransmit setup
sq->send_seq = 1;
@@ -966,7 +791,7 @@ fail:
void __visible
serialqueue_exit(struct serialqueue *sq)
{
- pollreactor_do_exit(&sq->pr);
+ pollreactor_do_exit(sq->pr);
kick_bg_thread(sq);
int ret = pthread_join(sq->tid, NULL);
if (ret)
@@ -979,7 +804,7 @@ serialqueue_free(struct serialqueue *sq)
{
if (!sq)
return;
- if (!pollreactor_is_exit(&sq->pr))
+ if (!pollreactor_is_exit(sq->pr))
serialqueue_exit(sq);
pthread_mutex_lock(&sq->lock);
message_queue_free(&sq->sent_queue);
@@ -995,7 +820,7 @@ serialqueue_free(struct serialqueue *sq)
message_queue_free(&cq->stalled_queue);
}
pthread_mutex_unlock(&sq->lock);
- pollreactor_free(&sq->pr);
+ pollreactor_free(sq->pr);
free(sq);
}
@@ -1085,7 +910,7 @@ serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm)
pthread_mutex_lock(&sq->lock);
// Wait for message to be available
while (list_empty(&sq->receive_queue)) {
- if (pollreactor_is_exit(&sq->pr))
+ if (pollreactor_is_exit(sq->pr))
goto exit;
sq->receive_waiting = 1;
int ret = pthread_cond_wait(&sq->cond, &sq->lock);