aboutsummaryrefslogtreecommitdiffstats
path: root/klippy/chelper/serialqueue.c
diff options
context:
space:
mode:
authorKevin O'Connor <kevin@koconnor.net>2021-02-10 11:09:47 -0500
committerKevin O'Connor <kevin@koconnor.net>2021-06-09 18:58:35 -0400
commitc53e8c7d4a945fee54f6ad575381cebec2a87c6f (patch)
treee42b3f40c9eed0be3e6dbe3969300abd5bffba7e /klippy/chelper/serialqueue.c
parent620f77ddb79de683befb458f43c30435759ac0e2 (diff)
downloadkutter-c53e8c7d4a945fee54f6ad575381cebec2a87c6f.tar.gz
kutter-c53e8c7d4a945fee54f6ad575381cebec2a87c6f.tar.xz
kutter-c53e8c7d4a945fee54f6ad575381cebec2a87c6f.zip
serialqueue: Add "fast reader" support
Add ability to run C code directly from the low-level socket reading thread. This enables host based low-latency handlers. Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
Diffstat (limited to 'klippy/chelper/serialqueue.c')
-rw-r--r--klippy/chelper/serialqueue.c67
1 files changed, 56 insertions, 11 deletions
diff --git a/klippy/chelper/serialqueue.c b/klippy/chelper/serialqueue.c
index 78115e09..bee39837 100644
--- a/klippy/chelper/serialqueue.c
+++ b/klippy/chelper/serialqueue.c
@@ -64,6 +64,9 @@ struct serialqueue {
struct list_head notify_queue;
// Received messages
struct list_head receive_queue;
+ // Fastreader support
+ pthread_mutex_t fast_reader_dispatch_lock;
+ struct list_head fast_readers;
// Debugging
struct list_head old_sent, old_receive;
// Stats
@@ -195,9 +198,11 @@ update_receive_seq(struct serialqueue *sq, double eventtime, uint64_t rseq)
}
// Process a well formed input message
-static int
+static void
handle_message(struct serialqueue *sq, double eventtime, int len)
{
+ pthread_mutex_lock(&sq->lock);
+
// Calculate receive sequence number
uint64_t rseq = ((sq->receive_seq & ~MESSAGE_SEQ_MASK)
| (sq->input_buf[MESSAGE_POS_SEQ] & MESSAGE_SEQ_MASK));
@@ -205,11 +210,15 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
// New sequence number
if (rseq < sq->receive_seq)
rseq += MESSAGE_SEQ_MASK+1;
- if (rseq > sq->send_seq && sq->receive_seq != 1)
+ if (rseq > sq->send_seq && sq->receive_seq != 1) {
// An ack for a message not sent? Out of order message?
- return -1;
+ sq->bytes_invalid += len;
+ pthread_mutex_unlock(&sq->lock);
+ return;
+ }
update_receive_seq(sq, eventtime, rseq);
}
+ sq->bytes_read += len;
// Check for pending messages on notify_queue
int must_wake = 0;
@@ -247,9 +256,26 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
must_wake = 1;
}
+ // Check fast readers
+ struct fastreader *fr;
+ list_for_each_entry(fr, &sq->fast_readers, node) {
+ if (len < fr->prefix_len + MESSAGE_MIN
+ || memcmp(&sq->input_buf[MESSAGE_HEADER_SIZE]
+ , fr->prefix, fr->prefix_len) != 0)
+ continue;
+ // Release main lock and invoke callback
+ pthread_mutex_lock(&sq->fast_reader_dispatch_lock);
+ if (must_wake)
+ check_wake_receive(sq);
+ pthread_mutex_unlock(&sq->lock);
+ fr->func(fr, sq->input_buf, len);
+ pthread_mutex_unlock(&sq->fast_reader_dispatch_lock);
+ return;
+ }
+
if (must_wake)
check_wake_receive(sq);
- return 0;
+ pthread_mutex_unlock(&sq->lock);
}
// Callback for input activity on the serial fd
@@ -288,13 +314,7 @@ input_event(struct serialqueue *sq, double eventtime)
return;
if (len > 0) {
// Received a valid message
- pthread_mutex_lock(&sq->lock);
- int ret = handle_message(sq, eventtime, len);
- if (ret)
- sq->bytes_invalid += len;
- else
- sq->bytes_read += len;
- pthread_mutex_unlock(&sq->lock);
+ handle_message(sq, eventtime, len);
} else {
// Skip bad data at beginning of input
len = -len;
@@ -614,6 +634,7 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id)
list_init(&sq->sent_queue);
list_init(&sq->receive_queue);
list_init(&sq->notify_queue);
+ list_init(&sq->fast_readers);
// Debugging
list_init(&sq->old_sent);
@@ -628,6 +649,9 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id)
ret = pthread_cond_init(&sq->cond, NULL);
if (ret)
goto fail;
+ ret = pthread_mutex_init(&sq->fast_reader_dispatch_lock, NULL);
+ if (ret)
+ goto fail;
ret = pthread_create(&sq->tid, NULL, background_thread, sq);
if (ret)
goto fail;
@@ -700,6 +724,27 @@ serialqueue_free_commandqueue(struct command_queue *cq)
free(cq);
}
+// Add a low-latency message handler
+void
+serialqueue_add_fastreader(struct serialqueue *sq, struct fastreader *fr)
+{
+ pthread_mutex_lock(&sq->lock);
+ list_add_tail(&fr->node, &sq->fast_readers);
+ pthread_mutex_unlock(&sq->lock);
+}
+
+// Remove a previously registered low-latency message handler
+void
+serialqueue_rm_fastreader(struct serialqueue *sq, struct fastreader *fr)
+{
+ pthread_mutex_lock(&sq->lock);
+ list_del(&fr->node);
+ pthread_mutex_unlock(&sq->lock);
+
+ pthread_mutex_lock(&sq->fast_reader_dispatch_lock); // XXX - goofy locking
+ pthread_mutex_unlock(&sq->fast_reader_dispatch_lock);
+}
+
// Add a batch of messages to the given command_queue
void
serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq