diff options
author | Kevin O'Connor <kevin@koconnor.net> | 2018-04-30 11:22:16 -0400 |
---|---|---|
committer | Kevin O'Connor <kevin@koconnor.net> | 2018-04-30 11:44:53 -0400 |
commit | 15248706ae3950ce89ea595c72349b3738983f41 (patch) | |
tree | a0bc14c73de7a0844c4a3a43381bc93334fa3dbb /klippy/serialqueue.c | |
parent | 8e1b516eb6e47eb3aa295f2b872fa60a530274e1 (diff) | |
download | kutter-15248706ae3950ce89ea595c72349b3738983f41.tar.gz kutter-15248706ae3950ce89ea595c72349b3738983f41.tar.xz kutter-15248706ae3950ce89ea595c72349b3738983f41.zip |
chelper: Move the host C code to a new klippy/chelper/ directory
Move the C code out of the main klippy/ directory and into its own
directory. This reduces the clutter in the main klippy directory.
Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
Diffstat (limited to 'klippy/serialqueue.c')
-rw-r--r-- | klippy/serialqueue.c | 1090 |
1 files changed, 0 insertions, 1090 deletions
diff --git a/klippy/serialqueue.c b/klippy/serialqueue.c deleted file mode 100644 index 2f7c25a5..00000000 --- a/klippy/serialqueue.c +++ /dev/null @@ -1,1090 +0,0 @@ -// Serial port command queuing -// -// Copyright (C) 2016 Kevin O'Connor <kevin@koconnor.net> -// -// This file may be distributed under the terms of the GNU GPLv3 license. -// -// This goal of this code is to handle low-level serial port -// communications with a microcontroller (mcu). This code is written -// in C (instead of python) to reduce communication latencies and to -// reduce scheduling jitter. The code queues messages to be -// transmitted, schedules transmission of commands at specified mcu -// clock times, prioritizes commands, and handles retransmissions. A -// background thread is launched to do this work and minimize latency. - -#include <fcntl.h> // fcntl -#include <math.h> // ceil -#include <poll.h> // poll -#include <pthread.h> // pthread_mutex_lock -#include <stddef.h> // offsetof -#include <stdint.h> // uint64_t -#include <stdio.h> // snprintf -#include <stdlib.h> // malloc -#include <string.h> // memset -#include <termios.h> // tcflush -#include <unistd.h> // pipe -#include "list.h" // list_add_tail -#include "pyhelper.h" // get_monotonic -#include "serialqueue.h" // struct queue_message - - -/**************************************************************** - * Poll reactor - ****************************************************************/ - -// The 'poll reactor' code is a mechanism for dispatching timer and -// file descriptor events. - -#define PR_NOW 0. -#define PR_NEVER 9999999999999999. - -struct pollreactor_timer { - double waketime; - double (*callback)(void *data, double eventtime); -}; - -struct pollreactor { - int num_fds, num_timers, must_exit; - void *callback_data; - double next_timer; - struct pollfd *fds; - void (**fd_callbacks)(void *data, double eventtime); - struct pollreactor_timer *timers; -}; - -// Allocate a new 'struct pollreactor' object -static void -pollreactor_setup(struct pollreactor *pr, int num_fds, int num_timers - , void *callback_data) -{ - pr->num_fds = num_fds; - pr->num_timers = num_timers; - pr->must_exit = 0; - pr->callback_data = callback_data; - pr->next_timer = PR_NEVER; - pr->fds = malloc(num_fds * sizeof(*pr->fds)); - memset(pr->fds, 0, num_fds * sizeof(*pr->fds)); - pr->fd_callbacks = malloc(num_fds * sizeof(*pr->fd_callbacks)); - memset(pr->fd_callbacks, 0, num_fds * sizeof(*pr->fd_callbacks)); - pr->timers = malloc(num_timers * sizeof(*pr->timers)); - memset(pr->timers, 0, num_timers * sizeof(*pr->timers)); - int i; - for (i=0; i<num_timers; i++) - pr->timers[i].waketime = PR_NEVER; -} - -// Free resources associated with a 'struct pollreactor' object -static void -pollreactor_free(struct pollreactor *pr) -{ - free(pr->fds); - pr->fds = NULL; - free(pr->fd_callbacks); - pr->fd_callbacks = NULL; - free(pr->timers); - pr->timers = NULL; -} - -// Add a callback for when a file descriptor (fd) becomes readable -static void -pollreactor_add_fd(struct pollreactor *pr, int pos, int fd, void *callback) -{ - pr->fds[pos].fd = fd; - pr->fds[pos].events = POLLIN|POLLHUP; - pr->fds[pos].revents = 0; - pr->fd_callbacks[pos] = callback; -} - -// Add a timer callback -static void -pollreactor_add_timer(struct pollreactor *pr, int pos, void *callback) -{ - pr->timers[pos].callback = callback; - pr->timers[pos].waketime = PR_NEVER; -} - -// Return the last schedule wake-up time for a timer -static double -pollreactor_get_timer(struct pollreactor *pr, int pos) -{ - return pr->timers[pos].waketime; -} - -// Set the wake-up time for a given timer -static void -pollreactor_update_timer(struct pollreactor *pr, int pos, double waketime) -{ - pr->timers[pos].waketime = waketime; - if (waketime < pr->next_timer) - pr->next_timer = waketime; -} - -// Internal code to invoke timer callbacks -static int -pollreactor_check_timers(struct pollreactor *pr, double eventtime) -{ - if (eventtime >= pr->next_timer) { - pr->next_timer = PR_NEVER; - int i; - for (i=0; i<pr->num_timers; i++) { - struct pollreactor_timer *timer = &pr->timers[i]; - double t = timer->waketime; - if (eventtime >= t) { - t = timer->callback(pr->callback_data, eventtime); - timer->waketime = t; - } - if (t < pr->next_timer) - pr->next_timer = t; - } - if (eventtime >= pr->next_timer) - return 0; - } - double timeout = ceil((pr->next_timer - eventtime) * 1000.); - return timeout < 1. ? 1 : (timeout > 1000. ? 1000 : (int)timeout); -} - -// Repeatedly check for timer and fd events and invoke their callbacks -static void -pollreactor_run(struct pollreactor *pr) -{ - double eventtime = get_monotonic(); - while (! pr->must_exit) { - int timeout = pollreactor_check_timers(pr, eventtime); - int ret = poll(pr->fds, pr->num_fds, timeout); - eventtime = get_monotonic(); - if (ret > 0) { - int i; - for (i=0; i<pr->num_fds; i++) - if (pr->fds[i].revents) - pr->fd_callbacks[i](pr->callback_data, eventtime); - } else if (ret < 0) { - report_errno("poll", ret); - pr->must_exit = 1; - } - } -} - -// Request that a currently running pollreactor_run() loop exit -static void -pollreactor_do_exit(struct pollreactor *pr) -{ - pr->must_exit = 1; -} - -// Check if a pollreactor_run() loop has been requested to exit -static int -pollreactor_is_exit(struct pollreactor *pr) -{ - return pr->must_exit; -} - -static int -set_non_blocking(int fd) -{ - int flags = fcntl(fd, F_GETFL); - if (flags < 0) { - report_errno("fcntl getfl", flags); - return -1; - } - int ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK); - if (ret < 0) { - report_errno("fcntl setfl", flags); - return -1; - } - return 0; -} - - -/**************************************************************** - * Serial protocol helpers - ****************************************************************/ - -// Implement the standard crc "ccitt" algorithm on the given buffer -static uint16_t -crc16_ccitt(uint8_t *buf, uint8_t len) -{ - uint16_t crc = 0xffff; - while (len--) { - uint8_t data = *buf++; - data ^= crc & 0xff; - data ^= data << 4; - crc = ((((uint16_t)data << 8) | (crc >> 8)) ^ (uint8_t)(data >> 4) - ^ ((uint16_t)data << 3)); - } - return crc; -} - -// Verify a buffer starts with a valid mcu message -static int -check_message(uint8_t *need_sync, uint8_t *buf, int buf_len) -{ - if (buf_len < MESSAGE_MIN) - // Need more data - return 0; - if (*need_sync) - goto error; - uint8_t msglen = buf[MESSAGE_POS_LEN]; - if (msglen < MESSAGE_MIN || msglen > MESSAGE_MAX) - goto error; - uint8_t msgseq = buf[MESSAGE_POS_SEQ]; - if ((msgseq & ~MESSAGE_SEQ_MASK) != MESSAGE_DEST) - goto error; - if (buf_len < msglen) - // Need more data - return 0; - if (buf[msglen-MESSAGE_TRAILER_SYNC] != MESSAGE_SYNC) - goto error; - uint16_t msgcrc = ((buf[msglen-MESSAGE_TRAILER_CRC] << 8) - | (uint8_t)buf[msglen-MESSAGE_TRAILER_CRC+1]); - uint16_t crc = crc16_ccitt(buf, msglen-MESSAGE_TRAILER_SIZE); - if (crc != msgcrc) - goto error; - return msglen; - -error: ; - // Discard bytes until next SYNC found - uint8_t *next_sync = memchr(buf, MESSAGE_SYNC, buf_len); - if (next_sync) { - *need_sync = 0; - return -(next_sync - buf + 1); - } - *need_sync = 1; - return -buf_len; -} - -// Encode an integer as a variable length quantity (vlq) -static uint8_t * -encode_int(uint8_t *p, uint32_t v) -{ - int32_t sv = v; - if (sv < (3L<<5) && sv >= -(1L<<5)) goto f4; - if (sv < (3L<<12) && sv >= -(1L<<12)) goto f3; - if (sv < (3L<<19) && sv >= -(1L<<19)) goto f2; - if (sv < (3L<<26) && sv >= -(1L<<26)) goto f1; - *p++ = (v>>28) | 0x80; -f1: *p++ = ((v>>21) & 0x7f) | 0x80; -f2: *p++ = ((v>>14) & 0x7f) | 0x80; -f3: *p++ = ((v>>7) & 0x7f) | 0x80; -f4: *p++ = v & 0x7f; - return p; -} - - -/**************************************************************** - * Command queues - ****************************************************************/ - -struct command_queue { - struct list_head stalled_queue, ready_queue; - struct list_node node; -}; - -// Allocate a 'struct queue_message' object -static struct queue_message * -message_alloc(void) -{ - struct queue_message *qm = malloc(sizeof(*qm)); - memset(qm, 0, sizeof(*qm)); - return qm; -} - -// Allocate a queue_message and fill it with the specified data -static struct queue_message * -message_fill(uint8_t *data, int len) -{ - struct queue_message *qm = message_alloc(); - memcpy(qm->msg, data, len); - qm->len = len; - return qm; -} - -// Allocate a queue_message and fill it with a series of encoded vlq integers -struct queue_message * -message_alloc_and_encode(uint32_t *data, int len) -{ - struct queue_message *qm = message_alloc(); - int i; - uint8_t *p = qm->msg; - for (i=0; i<len; i++) { - p = encode_int(p, data[i]); - if (p > &qm->msg[MESSAGE_PAYLOAD_MAX]) - goto fail; - } - qm->len = p - qm->msg; - return qm; - -fail: - errorf("Encode error"); - qm->len = 0; - return qm; -} - -// Free the storage from a previous message_alloc() call -static void -message_free(struct queue_message *qm) -{ - free(qm); -} - -// Free all the messages on a queue -void -message_queue_free(struct list_head *root) -{ - while (!list_empty(root)) { - struct queue_message *qm = list_first_entry( - root, struct queue_message, node); - list_del(&qm->node); - message_free(qm); - } -} - - -/**************************************************************** - * Serialqueue interface - ****************************************************************/ - -struct serialqueue { - // Input reading - struct pollreactor pr; - int serial_fd; - int pipe_fds[2]; - uint8_t input_buf[4096]; - uint8_t need_sync; - int input_pos; - // Threading - pthread_t tid; - pthread_mutex_t lock; // protects variables below - pthread_cond_t cond; - int receive_waiting; - // Baud / clock tracking - double baud_adjust, idle_time; - double est_freq, last_clock_time; - uint64_t last_clock; - double last_receive_sent_time; - // Retransmit support - uint64_t send_seq, receive_seq; - uint64_t ignore_nak_seq, retransmit_seq, rtt_sample_seq; - struct list_head sent_queue; - double srtt, rttvar, rto; - // Pending transmission message queues - struct list_head pending_queues; - int ready_bytes, stalled_bytes, need_ack_bytes; - uint64_t need_kick_clock; - // Received messages - struct list_head receive_queue; - // Debugging - struct list_head old_sent, old_receive; - // Stats - uint32_t bytes_write, bytes_read, bytes_retransmit, bytes_invalid; -}; - -#define SQPF_SERIAL 0 -#define SQPF_PIPE 1 -#define SQPF_NUM 2 - -#define SQPT_RETRANSMIT 0 -#define SQPT_COMMAND 1 -#define SQPT_NUM 2 - -#define MIN_RTO 0.025 -#define MAX_RTO 5.000 -#define MIN_REQTIME_DELTA 0.250 -#define MIN_BACKGROUND_DELTA 0.005 -#define IDLE_QUERY_TIME 1.0 - -#define DEBUG_QUEUE_SENT 100 -#define DEBUG_QUEUE_RECEIVE 20 - -// Create a series of empty messages and add them to a list -static void -debug_queue_alloc(struct list_head *root, int count) -{ - int i; - for (i=0; i<count; i++) { - struct queue_message *qm = message_alloc(); - list_add_head(&qm->node, root); - } -} - -// Copy a message to a debug queue and free old debug messages -static void -debug_queue_add(struct list_head *root, struct queue_message *qm) -{ - list_add_tail(&qm->node, root); - struct queue_message *old = list_first_entry( - root, struct queue_message, node); - list_del(&old->node); - message_free(old); -} - -// Wake up the receiver thread if it is waiting -static void -check_wake_receive(struct serialqueue *sq) -{ - if (sq->receive_waiting) { - sq->receive_waiting = 0; - pthread_cond_signal(&sq->cond); - } -} - -// Write to the internal pipe to wake the background thread if in poll -static void -kick_bg_thread(struct serialqueue *sq) -{ - int ret = write(sq->pipe_fds[1], ".", 1); - if (ret < 0) - report_errno("pipe write", ret); -} - -// Update internal state when the receive sequence increases -static void -update_receive_seq(struct serialqueue *sq, double eventtime, uint64_t rseq) -{ - // Remove from sent queue - uint64_t sent_seq = sq->receive_seq; - for (;;) { - struct queue_message *sent = list_first_entry( - &sq->sent_queue, struct queue_message, node); - if (list_empty(&sq->sent_queue)) { - // Got an ack for a message not sent; must be connection init - sq->send_seq = rseq; - sq->last_receive_sent_time = 0.; - break; - } - sq->need_ack_bytes -= sent->len; - list_del(&sent->node); - debug_queue_add(&sq->old_sent, sent); - sent_seq++; - if (rseq == sent_seq) { - // Found sent message corresponding with the received sequence - sq->last_receive_sent_time = sent->receive_time; - break; - } - } - sq->receive_seq = rseq; - pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW); - - // Update retransmit info - if (sq->rtt_sample_seq && rseq > sq->rtt_sample_seq - && sq->last_receive_sent_time) { - // RFC6298 rtt calculations - double delta = eventtime - sq->last_receive_sent_time; - if (!sq->srtt) { - sq->rttvar = delta / 2.0; - sq->srtt = delta * 10.0; // use a higher start default - } else { - sq->rttvar = (3.0 * sq->rttvar + fabs(sq->srtt - delta)) / 4.0; - sq->srtt = (7.0 * sq->srtt + delta) / 8.0; - } - double rttvar4 = sq->rttvar * 4.0; - if (rttvar4 < 0.001) - rttvar4 = 0.001; - sq->rto = sq->srtt + rttvar4; - if (sq->rto < MIN_RTO) - sq->rto = MIN_RTO; - else if (sq->rto > MAX_RTO) - sq->rto = MAX_RTO; - sq->rtt_sample_seq = 0; - } - if (list_empty(&sq->sent_queue)) { - pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NEVER); - } else { - struct queue_message *sent = list_first_entry( - &sq->sent_queue, struct queue_message, node); - double nr = eventtime + sq->rto + sent->len * sq->baud_adjust; - pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, nr); - } -} - -// Process a well formed input message -static void -handle_message(struct serialqueue *sq, double eventtime, int len) -{ - // Calculate receive sequence number - uint64_t rseq = ((sq->receive_seq & ~MESSAGE_SEQ_MASK) - | (sq->input_buf[MESSAGE_POS_SEQ] & MESSAGE_SEQ_MASK)); - if (rseq < sq->receive_seq) - rseq += MESSAGE_SEQ_MASK+1; - - if (rseq != sq->receive_seq) - // New sequence number - update_receive_seq(sq, eventtime, rseq); - else if (len == MESSAGE_MIN && rseq > sq->ignore_nak_seq - && !list_empty(&sq->sent_queue)) - // Duplicate sequence number in an empty message is a nak - pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NOW); - - if (len > MESSAGE_MIN) { - // Add message to receive queue - struct queue_message *qm = message_fill(sq->input_buf, len); - qm->sent_time = (rseq > sq->retransmit_seq - ? sq->last_receive_sent_time : 0.); - qm->receive_time = get_monotonic(); // must be time post read() - qm->receive_time -= sq->baud_adjust * len; - list_add_tail(&qm->node, &sq->receive_queue); - check_wake_receive(sq); - } -} - -// Callback for input activity on the serial fd -static void -input_event(struct serialqueue *sq, double eventtime) -{ - int ret = read(sq->serial_fd, &sq->input_buf[sq->input_pos] - , sizeof(sq->input_buf) - sq->input_pos); - if (ret <= 0) { - report_errno("read", ret); - pollreactor_do_exit(&sq->pr); - return; - } - sq->input_pos += ret; - for (;;) { - ret = check_message(&sq->need_sync, sq->input_buf, sq->input_pos); - if (!ret) - // Need more data - return; - if (ret > 0) { - // Received a valid message - pthread_mutex_lock(&sq->lock); - handle_message(sq, eventtime, ret); - sq->bytes_read += ret; - pthread_mutex_unlock(&sq->lock); - } else { - // Skip bad data at beginning of input - ret = -ret; - pthread_mutex_lock(&sq->lock); - sq->bytes_invalid += ret; - pthread_mutex_unlock(&sq->lock); - } - sq->input_pos -= ret; - if (sq->input_pos) - memmove(sq->input_buf, &sq->input_buf[ret], sq->input_pos); - } -} - -// Callback for input activity on the pipe fd (wakes command_event) -static void -kick_event(struct serialqueue *sq, double eventtime) -{ - char dummy[4096]; - int ret = read(sq->pipe_fds[0], dummy, sizeof(dummy)); - if (ret < 0) - report_errno("pipe read", ret); - pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW); -} - -// Callback timer for when a retransmit should be done -static double -retransmit_event(struct serialqueue *sq, double eventtime) -{ - int ret = tcflush(sq->serial_fd, TCOFLUSH); - if (ret < 0) - report_errno("tcflush", ret); - - pthread_mutex_lock(&sq->lock); - - // Retransmit all pending messages - uint8_t buf[MESSAGE_MAX * MESSAGE_SEQ_MASK + 1]; - int buflen = 0, first_buflen = 0; - buf[buflen++] = MESSAGE_SYNC; - struct queue_message *qm; - list_for_each_entry(qm, &sq->sent_queue, node) { - memcpy(&buf[buflen], qm->msg, qm->len); - buflen += qm->len; - if (!first_buflen) - first_buflen = qm->len + 1; - } - ret = write(sq->serial_fd, buf, buflen); - if (ret < 0) - report_errno("retransmit write", ret); - sq->bytes_retransmit += buflen; - - // Update rto - if (pollreactor_get_timer(&sq->pr, SQPT_RETRANSMIT) == PR_NOW) { - // Retransmit due to nak - sq->ignore_nak_seq = sq->receive_seq; - if (sq->receive_seq < sq->retransmit_seq) - // Second nak for this retransmit - don't allow third - sq->ignore_nak_seq = sq->retransmit_seq; - } else { - // Retransmit due to timeout - sq->rto *= 2.0; - if (sq->rto > MAX_RTO) - sq->rto = MAX_RTO; - sq->ignore_nak_seq = sq->send_seq; - } - sq->retransmit_seq = sq->send_seq; - sq->rtt_sample_seq = 0; - sq->idle_time = eventtime + buflen * sq->baud_adjust; - double waketime = eventtime + first_buflen * sq->baud_adjust + sq->rto; - - pthread_mutex_unlock(&sq->lock); - return waketime; -} - -// Construct a block of data and send to the serial port -static void -build_and_send_command(struct serialqueue *sq, double eventtime) -{ - struct queue_message *out = message_alloc(); - out->len = MESSAGE_HEADER_SIZE; - - while (sq->ready_bytes) { - // Find highest priority message (message with lowest req_clock) - uint64_t min_clock = MAX_CLOCK; - struct command_queue *q, *cq = NULL; - struct queue_message *qm = NULL; - list_for_each_entry(q, &sq->pending_queues, node) { - if (!list_empty(&q->ready_queue)) { - struct queue_message *m = list_first_entry( - &q->ready_queue, struct queue_message, node); - if (m->req_clock < min_clock) { - min_clock = m->req_clock; - cq = q; - qm = m; - } - } - } - // Append message to outgoing command - if (out->len + qm->len > sizeof(out->msg) - MESSAGE_TRAILER_SIZE) - break; - list_del(&qm->node); - if (list_empty(&cq->ready_queue) && list_empty(&cq->stalled_queue)) - list_del(&cq->node); - memcpy(&out->msg[out->len], qm->msg, qm->len); - out->len += qm->len; - sq->ready_bytes -= qm->len; - message_free(qm); - } - - // Fill header / trailer - out->len += MESSAGE_TRAILER_SIZE; - out->msg[MESSAGE_POS_LEN] = out->len; - out->msg[MESSAGE_POS_SEQ] = MESSAGE_DEST | (sq->send_seq & MESSAGE_SEQ_MASK); - uint16_t crc = crc16_ccitt(out->msg, out->len - MESSAGE_TRAILER_SIZE); - out->msg[out->len - MESSAGE_TRAILER_CRC] = crc >> 8; - out->msg[out->len - MESSAGE_TRAILER_CRC+1] = crc & 0xff; - out->msg[out->len - MESSAGE_TRAILER_SYNC] = MESSAGE_SYNC; - - // Send message - int ret = write(sq->serial_fd, out->msg, out->len); - if (ret < 0) - report_errno("write", ret); - sq->bytes_write += out->len; - if (eventtime > sq->idle_time) - sq->idle_time = eventtime; - sq->idle_time += out->len * sq->baud_adjust; - out->sent_time = eventtime; - out->receive_time = sq->idle_time; - if (list_empty(&sq->sent_queue)) - pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT - , sq->idle_time + sq->rto); - if (!sq->rtt_sample_seq) - sq->rtt_sample_seq = sq->send_seq; - sq->send_seq++; - sq->need_ack_bytes += out->len; - list_add_tail(&out->node, &sq->sent_queue); -} - -// Determine the time the next serial data should be sent -static double -check_send_command(struct serialqueue *sq, double eventtime) -{ - if ((sq->send_seq - sq->receive_seq >= MESSAGE_SEQ_MASK - || (sq->need_ack_bytes - 2*MESSAGE_MAX) * sq->baud_adjust > sq->srtt) - && sq->receive_seq != (uint64_t)-1) - // Need an ack before more messages can be sent - return PR_NEVER; - - // Check for stalled messages now ready - double idletime = eventtime > sq->idle_time ? eventtime : sq->idle_time; - idletime += MESSAGE_MIN * sq->baud_adjust; - double timedelta = idletime - sq->last_clock_time; - uint64_t ack_clock = ((uint64_t)(timedelta * sq->est_freq) - + sq->last_clock); - uint64_t min_stalled_clock = MAX_CLOCK, min_ready_clock = MAX_CLOCK; - struct command_queue *cq; - list_for_each_entry(cq, &sq->pending_queues, node) { - // Move messages from the stalled_queue to the ready_queue - while (!list_empty(&cq->stalled_queue)) { - struct queue_message *qm = list_first_entry( - &cq->stalled_queue, struct queue_message, node); - if (ack_clock < qm->min_clock) { - if (qm->min_clock < min_stalled_clock) - min_stalled_clock = qm->min_clock; - break; - } - list_del(&qm->node); - list_add_tail(&qm->node, &cq->ready_queue); - sq->stalled_bytes -= qm->len; - sq->ready_bytes += qm->len; - } - // Update min_ready_clock - if (!list_empty(&cq->ready_queue)) { - struct queue_message *qm = list_first_entry( - &cq->ready_queue, struct queue_message, node); - uint64_t req_clock = qm->req_clock; - if (req_clock == BACKGROUND_PRIORITY_CLOCK) - req_clock = (uint64_t)( - (sq->idle_time - sq->last_clock_time + MIN_BACKGROUND_DELTA) - * sq->est_freq) + sq->last_clock; - if (req_clock < min_ready_clock) - min_ready_clock = req_clock; - } - } - - // Check for messages to send - if (sq->ready_bytes >= MESSAGE_PAYLOAD_MAX) - return PR_NOW; - if (! sq->est_freq) { - if (sq->ready_bytes) - return PR_NOW; - sq->need_kick_clock = MAX_CLOCK; - return PR_NEVER; - } - uint64_t reqclock_delta = MIN_REQTIME_DELTA * sq->est_freq; - if (min_ready_clock <= ack_clock + reqclock_delta) - return PR_NOW; - uint64_t wantclock = min_ready_clock - reqclock_delta; - if (min_stalled_clock < wantclock) - wantclock = min_stalled_clock; - sq->need_kick_clock = wantclock; - return idletime + (wantclock - ack_clock) / sq->est_freq; -} - -// Callback timer to send data to the serial port -static double -command_event(struct serialqueue *sq, double eventtime) -{ - pthread_mutex_lock(&sq->lock); - double waketime; - for (;;) { - waketime = check_send_command(sq, eventtime); - if (waketime != PR_NOW) - break; - build_and_send_command(sq, eventtime); - } - pthread_mutex_unlock(&sq->lock); - return waketime; -} - -// Main background thread for reading/writing to serial port -static void * -background_thread(void *data) -{ - struct serialqueue *sq = data; - pollreactor_run(&sq->pr); - - pthread_mutex_lock(&sq->lock); - check_wake_receive(sq); - pthread_mutex_unlock(&sq->lock); - - return NULL; -} - -// Create a new 'struct serialqueue' object -struct serialqueue * -serialqueue_alloc(int serial_fd, int write_only) -{ - struct serialqueue *sq = malloc(sizeof(*sq)); - memset(sq, 0, sizeof(*sq)); - - // Reactor setup - sq->serial_fd = serial_fd; - int ret = pipe(sq->pipe_fds); - if (ret) - goto fail; - pollreactor_setup(&sq->pr, SQPF_NUM, SQPT_NUM, sq); - if (!write_only) - pollreactor_add_fd(&sq->pr, SQPF_SERIAL, serial_fd, input_event); - pollreactor_add_fd(&sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event); - pollreactor_add_timer(&sq->pr, SQPT_RETRANSMIT, retransmit_event); - pollreactor_add_timer(&sq->pr, SQPT_COMMAND, command_event); - set_non_blocking(serial_fd); - set_non_blocking(sq->pipe_fds[0]); - set_non_blocking(sq->pipe_fds[1]); - - // Retransmit setup - sq->send_seq = 1; - if (write_only) { - sq->receive_seq = -1; - sq->rto = PR_NEVER; - } else { - sq->receive_seq = 1; - sq->rto = MIN_RTO; - } - - // Queues - sq->need_kick_clock = MAX_CLOCK; - list_init(&sq->pending_queues); - list_init(&sq->sent_queue); - list_init(&sq->receive_queue); - - // Debugging - list_init(&sq->old_sent); - list_init(&sq->old_receive); - debug_queue_alloc(&sq->old_sent, DEBUG_QUEUE_SENT); - debug_queue_alloc(&sq->old_receive, DEBUG_QUEUE_RECEIVE); - - // Thread setup - ret = pthread_mutex_init(&sq->lock, NULL); - if (ret) - goto fail; - ret = pthread_cond_init(&sq->cond, NULL); - if (ret) - goto fail; - ret = pthread_create(&sq->tid, NULL, background_thread, sq); - if (ret) - goto fail; - - return sq; - -fail: - report_errno("init", ret); - return NULL; -} - -// Request that the background thread exit -void -serialqueue_exit(struct serialqueue *sq) -{ - pollreactor_do_exit(&sq->pr); - kick_bg_thread(sq); - int ret = pthread_join(sq->tid, NULL); - if (ret) - report_errno("pthread_join", ret); -} - -// Free all resources associated with a serialqueue -void -serialqueue_free(struct serialqueue *sq) -{ - if (!sq) - return; - if (!pollreactor_is_exit(&sq->pr)) - serialqueue_exit(sq); - pthread_mutex_lock(&sq->lock); - message_queue_free(&sq->sent_queue); - message_queue_free(&sq->receive_queue); - message_queue_free(&sq->old_sent); - message_queue_free(&sq->old_receive); - while (!list_empty(&sq->pending_queues)) { - struct command_queue *cq = list_first_entry( - &sq->pending_queues, struct command_queue, node); - list_del(&cq->node); - message_queue_free(&cq->ready_queue); - message_queue_free(&cq->stalled_queue); - } - pthread_mutex_unlock(&sq->lock); - pollreactor_free(&sq->pr); - free(sq); -} - -// Allocate a 'struct command_queue' -struct command_queue * -serialqueue_alloc_commandqueue(void) -{ - struct command_queue *cq = malloc(sizeof(*cq)); - memset(cq, 0, sizeof(*cq)); - list_init(&cq->ready_queue); - list_init(&cq->stalled_queue); - return cq; -} - -// Free a 'struct command_queue' -void -serialqueue_free_commandqueue(struct command_queue *cq) -{ - if (!cq) - return; - if (!list_empty(&cq->ready_queue) || !list_empty(&cq->stalled_queue)) { - errorf("Memory leak! Can't free non-empty commandqueue"); - return; - } - free(cq); -} - -// Add a batch of messages to the given command_queue -void -serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq - , struct list_head *msgs) -{ - // Make sure min_clock is set in list and calculate total bytes - int len = 0; - struct queue_message *qm; - list_for_each_entry(qm, msgs, node) { - if (qm->min_clock + (1LL<<31) < qm->req_clock - && qm->req_clock != BACKGROUND_PRIORITY_CLOCK) - qm->min_clock = qm->req_clock - (1LL<<31); - len += qm->len; - } - if (! len) - return; - qm = list_first_entry(msgs, struct queue_message, node); - - // Add list to cq->stalled_queue - pthread_mutex_lock(&sq->lock); - if (list_empty(&cq->ready_queue) && list_empty(&cq->stalled_queue)) - list_add_tail(&cq->node, &sq->pending_queues); - list_join_tail(msgs, &cq->stalled_queue); - sq->stalled_bytes += len; - int mustwake = 0; - if (qm->min_clock < sq->need_kick_clock) { - sq->need_kick_clock = 0; - mustwake = 1; - } - pthread_mutex_unlock(&sq->lock); - - // Wake the background thread if necessary - if (mustwake) - kick_bg_thread(sq); -} - -// Schedule the transmission of a message on the serial port at a -// given time and priority. -void -serialqueue_send(struct serialqueue *sq, struct command_queue *cq - , uint8_t *msg, int len, uint64_t min_clock, uint64_t req_clock) -{ - struct queue_message *qm = message_fill(msg, len); - qm->min_clock = min_clock; - qm->req_clock = req_clock; - - struct list_head msgs; - list_init(&msgs); - list_add_tail(&qm->node, &msgs); - serialqueue_send_batch(sq, cq, &msgs); -} - -// Like serialqueue_send() but also builds the message to be sent -void -serialqueue_encode_and_send(struct serialqueue *sq, struct command_queue *cq - , uint32_t *data, int len - , uint64_t min_clock, uint64_t req_clock) -{ - struct queue_message *qm = message_alloc_and_encode(data, len); - qm->min_clock = min_clock; - qm->req_clock = req_clock; - - struct list_head msgs; - list_init(&msgs); - list_add_tail(&qm->node, &msgs); - serialqueue_send_batch(sq, cq, &msgs); -} - -// Return a message read from the serial port (or wait for one if none -// available) -void -serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm) -{ - pthread_mutex_lock(&sq->lock); - // Wait for message to be available - while (list_empty(&sq->receive_queue)) { - if (pollreactor_is_exit(&sq->pr)) - goto exit; - sq->receive_waiting = 1; - int ret = pthread_cond_wait(&sq->cond, &sq->lock); - if (ret) - report_errno("pthread_cond_wait", ret); - } - - // Remove message from queue - struct queue_message *qm = list_first_entry( - &sq->receive_queue, struct queue_message, node); - list_del(&qm->node); - - // Copy message - memcpy(pqm->msg, qm->msg, qm->len); - pqm->len = qm->len; - pqm->sent_time = qm->sent_time; - pqm->receive_time = qm->receive_time; - debug_queue_add(&sq->old_receive, qm); - - pthread_mutex_unlock(&sq->lock); - return; - -exit: - pqm->len = -1; - pthread_mutex_unlock(&sq->lock); -} - -void -serialqueue_set_baud_adjust(struct serialqueue *sq, double baud_adjust) -{ - pthread_mutex_lock(&sq->lock); - sq->baud_adjust = baud_adjust; - pthread_mutex_unlock(&sq->lock); -} - -// Set the estimated clock rate of the mcu on the other end of the -// serial port -void -serialqueue_set_clock_est(struct serialqueue *sq, double est_freq - , double last_clock_time, uint64_t last_clock) -{ - pthread_mutex_lock(&sq->lock); - sq->est_freq = est_freq; - sq->last_clock_time = last_clock_time; - sq->last_clock = last_clock; - pthread_mutex_unlock(&sq->lock); -} - -// Return a string buffer containing statistics for the serial port -void -serialqueue_get_stats(struct serialqueue *sq, char *buf, int len) -{ - struct serialqueue stats; - pthread_mutex_lock(&sq->lock); - memcpy(&stats, sq, sizeof(stats)); - pthread_mutex_unlock(&sq->lock); - - snprintf(buf, len, "bytes_write=%u bytes_read=%u" - " bytes_retransmit=%u bytes_invalid=%u" - " send_seq=%u receive_seq=%u retransmit_seq=%u" - " srtt=%.3f rttvar=%.3f rto=%.3f" - " ready_bytes=%u stalled_bytes=%u" - , stats.bytes_write, stats.bytes_read - , stats.bytes_retransmit, stats.bytes_invalid - , (int)stats.send_seq, (int)stats.receive_seq - , (int)stats.retransmit_seq - , stats.srtt, stats.rttvar, stats.rto - , stats.ready_bytes, stats.stalled_bytes); -} - -// Extract old messages stored in the debug queues -int -serialqueue_extract_old(struct serialqueue *sq, int sentq - , struct pull_queue_message *q, int max) -{ - int count = sentq ? DEBUG_QUEUE_SENT : DEBUG_QUEUE_RECEIVE; - struct list_head *rootp = sentq ? &sq->old_sent : &sq->old_receive; - struct list_head replacement, current; - list_init(&replacement); - debug_queue_alloc(&replacement, count); - list_init(¤t); - - // Atomically replace existing debug list with new zero'd list - pthread_mutex_lock(&sq->lock); - list_join_tail(rootp, ¤t); - list_init(rootp); - list_join_tail(&replacement, rootp); - pthread_mutex_unlock(&sq->lock); - - // Walk the debug list - int pos = 0; - while (!list_empty(¤t)) { - struct queue_message *qm = list_first_entry( - ¤t, struct queue_message, node); - if (qm->len && pos < max) { - struct pull_queue_message *pqm = q++; - pos++; - memcpy(pqm->msg, qm->msg, qm->len); - pqm->len = qm->len; - pqm->sent_time = qm->sent_time; - pqm->receive_time = qm->receive_time; - } - list_del(&qm->node); - message_free(qm); - } - return pos; -} |