aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKevin O'Connor <kevin@koconnor.net>2020-02-14 20:47:08 -0500
committerKevin O'Connor <kevin@koconnor.net>2020-02-20 12:01:21 -0500
commitc6c360c4e14374a56dcb0477e1e7759683841093 (patch)
tree070667687291ab046f449127e37b8aeec7d74f5e
parent7b90830ae578544cf6ebe7bdbc70baea1f6b1509 (diff)
downloadkutter-c6c360c4e14374a56dcb0477e1e7759683841093.tar.gz
kutter-c6c360c4e14374a56dcb0477e1e7759683841093.tar.xz
kutter-c6c360c4e14374a56dcb0477e1e7759683841093.zip
serialqueue: Support notification of when a command is processed
Add ability for the host code to get a notification when the ack for a command sent to the micro-controller is received. This is in preparation for improved detection of message loss between mcu and host. Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
-rw-r--r--klippy/chelper/__init__.py4
-rw-r--r--klippy/chelper/serialqueue.c55
-rw-r--r--klippy/chelper/serialqueue.h6
-rw-r--r--klippy/serialhdl.py31
4 files changed, 79 insertions, 17 deletions
diff --git a/klippy/chelper/__init__.py b/klippy/chelper/__init__.py
index c979340b..1a4b2d74 100644
--- a/klippy/chelper/__init__.py
+++ b/klippy/chelper/__init__.py
@@ -110,6 +110,7 @@ defs_serialqueue = """
uint8_t msg[MESSAGE_MAX];
int len;
double sent_time, receive_time;
+ uint64_t notify_id;
};
struct serialqueue *serialqueue_alloc(int serial_fd, int write_only);
@@ -118,7 +119,8 @@ defs_serialqueue = """
struct command_queue *serialqueue_alloc_commandqueue(void);
void serialqueue_free_commandqueue(struct command_queue *cq);
void serialqueue_send(struct serialqueue *sq, struct command_queue *cq
- , uint8_t *msg, int len, uint64_t min_clock, uint64_t req_clock);
+ , uint8_t *msg, int len, uint64_t min_clock, uint64_t req_clock
+ , uint64_t notify_id);
void serialqueue_pull(struct serialqueue *sq
, struct pull_queue_message *pqm);
void serialqueue_set_baud_adjust(struct serialqueue *sq
diff --git a/klippy/chelper/serialqueue.c b/klippy/chelper/serialqueue.c
index 61331379..779156ce 100644
--- a/klippy/chelper/serialqueue.c
+++ b/klippy/chelper/serialqueue.c
@@ -1,9 +1,9 @@
// Serial port command queuing
//
-// Copyright (C) 2016-2018 Kevin O'Connor <kevin@koconnor.net>
+// Copyright (C) 2016-2020 Kevin O'Connor <kevin@koconnor.net>
//
// This file may be distributed under the terms of the GNU GPLv3 license.
-//
+
// This goal of this code is to handle low-level serial port
// communications with a microcontroller (mcu). This code is written
// in C (instead of python) to reduce communication latencies and to
@@ -372,6 +372,7 @@ struct serialqueue {
struct list_head pending_queues;
int ready_bytes, stalled_bytes, need_ack_bytes, last_ack_bytes;
uint64_t need_kick_clock;
+ struct list_head notify_queue;
// Received messages
struct list_head receive_queue;
// Debugging
@@ -512,6 +513,25 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
if (rseq != sq->receive_seq)
// New sequence number
update_receive_seq(sq, eventtime, rseq);
+
+ // Check for pending messages on notify_queue
+ int must_wake = 0;
+ while (!list_empty(&sq->notify_queue)) {
+ struct queue_message *qm = list_first_entry(
+ &sq->notify_queue, struct queue_message, node);
+ uint64_t wake_seq = rseq - 1 - (len > MESSAGE_MIN ? 1 : 0);
+ uint64_t notify_msg_sent_seq = qm->req_clock;
+ if (notify_msg_sent_seq > wake_seq)
+ break;
+ list_del(&qm->node);
+ qm->len = 0;
+ qm->sent_time = sq->last_receive_sent_time;
+ qm->receive_time = eventtime;
+ list_add_tail(&qm->node, &sq->receive_queue);
+ must_wake = 1;
+ }
+
+ // Process message
if (len == MESSAGE_MIN) {
// Ack/nak message
if (sq->last_ack_seq < rseq)
@@ -519,18 +539,19 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
else if (rseq > sq->ignore_nak_seq && !list_empty(&sq->sent_queue))
// Duplicate Ack is a Nak - do fast retransmit
pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NOW);
- }
-
- if (len > MESSAGE_MIN) {
- // Add message to receive queue
+ } else {
+ // Data message - add to receive queue
struct queue_message *qm = message_fill(sq->input_buf, len);
qm->sent_time = (rseq > sq->retransmit_seq
? sq->last_receive_sent_time : 0.);
qm->receive_time = get_monotonic(); // must be time post read()
qm->receive_time -= sq->baud_adjust * len;
list_add_tail(&qm->node, &sq->receive_queue);
- check_wake_receive(sq);
+ must_wake = 1;
}
+
+ if (must_wake)
+ check_wake_receive(sq);
}
// Callback for input activity on the serial fd
@@ -661,7 +682,13 @@ build_and_send_command(struct serialqueue *sq, double eventtime)
memcpy(&out->msg[out->len], qm->msg, qm->len);
out->len += qm->len;
sq->ready_bytes -= qm->len;
- message_free(qm);
+ if (qm->notify_id) {
+ // Message requires notification - add to notify list
+ qm->req_clock = sq->send_seq;
+ list_add_tail(&qm->node, &sq->notify_queue);
+ } else {
+ message_free(qm);
+ }
}
// Fill header / trailer
@@ -835,6 +862,7 @@ serialqueue_alloc(int serial_fd, int write_only)
list_init(&sq->pending_queues);
list_init(&sq->sent_queue);
list_init(&sq->receive_queue);
+ list_init(&sq->notify_queue);
// Debugging
list_init(&sq->old_sent);
@@ -882,6 +910,7 @@ serialqueue_free(struct serialqueue *sq)
pthread_mutex_lock(&sq->lock);
message_queue_free(&sq->sent_queue);
message_queue_free(&sq->receive_queue);
+ message_queue_free(&sq->notify_queue);
message_queue_free(&sq->old_sent);
message_queue_free(&sq->old_receive);
while (!list_empty(&sq->pending_queues)) {
@@ -960,11 +989,13 @@ serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq
// given time and priority.
void __visible
serialqueue_send(struct serialqueue *sq, struct command_queue *cq, uint8_t *msg
- , int len, uint64_t min_clock, uint64_t req_clock)
+ , int len, uint64_t min_clock, uint64_t req_clock
+ , uint64_t notify_id)
{
struct queue_message *qm = message_fill(msg, len);
qm->min_clock = min_clock;
qm->req_clock = req_clock;
+ qm->notify_id = notify_id;
struct list_head msgs;
list_init(&msgs);
@@ -998,7 +1029,11 @@ serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm)
pqm->len = qm->len;
pqm->sent_time = qm->sent_time;
pqm->receive_time = qm->receive_time;
- debug_queue_add(&sq->old_receive, qm);
+ pqm->notify_id = qm->notify_id;
+ if (qm->len)
+ debug_queue_add(&sq->old_receive, qm);
+ else
+ message_free(qm);
pthread_mutex_unlock(&sq->lock);
return;
diff --git a/klippy/chelper/serialqueue.h b/klippy/chelper/serialqueue.h
index 807e9883..071923e5 100644
--- a/klippy/chelper/serialqueue.h
+++ b/klippy/chelper/serialqueue.h
@@ -32,6 +32,7 @@ struct queue_message {
double sent_time, receive_time;
};
};
+ uint64_t notify_id;
struct list_node node;
};
@@ -42,6 +43,7 @@ struct pull_queue_message {
uint8_t msg[MESSAGE_MAX];
int len;
double sent_time, receive_time;
+ uint64_t notify_id;
};
struct serialqueue;
@@ -53,8 +55,8 @@ void serialqueue_free_commandqueue(struct command_queue *cq);
void serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq
, struct list_head *msgs);
void serialqueue_send(struct serialqueue *sq, struct command_queue *cq
- , uint8_t *msg, int len
- , uint64_t min_clock, uint64_t req_clock);
+ , uint8_t *msg, int len, uint64_t min_clock
+ , uint64_t req_clock, uint64_t notify_id);
void serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm);
void serialqueue_set_baud_adjust(struct serialqueue *sq, double baud_adjust);
void serialqueue_set_clock_est(struct serialqueue *sq, double est_freq
diff --git a/klippy/serialhdl.py b/klippy/serialhdl.py
index 1954bb56..190fd745 100644
--- a/klippy/serialhdl.py
+++ b/klippy/serialhdl.py
@@ -1,6 +1,6 @@
# Serial port management for firmware communication
#
-# Copyright (C) 2016-2019 Kevin O'Connor <kevin@koconnor.net>
+# Copyright (C) 2016-2020 Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging, threading
@@ -32,13 +32,22 @@ class SerialReader:
self.handlers = {}
self.register_response(self._handle_unknown_init, '#unknown')
self.register_response(self.handle_output, '#output')
+ # Sent message notification tracking
+ self.last_notify_id = 0
+ self.pending_notifications = {}
def _bg_thread(self):
response = self.ffi_main.new('struct pull_queue_message *')
while 1:
self.ffi_lib.serialqueue_pull(self.serialqueue, response)
count = response.len
- if count <= 0:
+ if count < 0:
break
+ if response.notify_id:
+ params = {'#sent_time': response.sent_time,
+ '#receive_time': response.receive_time}
+ completion = self.pending_notifications.pop(response.notify_id)
+ self.reactor.async_complete(completion, params)
+ continue
params = self.msgparser.parse(response.msg[0:count])
params['#sent_time'] = response.sent_time
params['#receive_time'] = response.receive_time
@@ -126,6 +135,9 @@ class SerialReader:
if self.ser is not None:
self.ser.close()
self.ser = None
+ for pn in self.pending_notifications.values():
+ pn.complete(None)
+ self.pending_notifications.clear()
def stats(self, eventtime):
if self.serialqueue is None:
return ""
@@ -145,8 +157,19 @@ class SerialReader:
self.handlers[name, oid] = callback
# Command sending
def raw_send(self, cmd, minclock, reqclock, cmd_queue):
- self.ffi_lib.serialqueue_send(
- self.serialqueue, cmd_queue, cmd, len(cmd), minclock, reqclock)
+ self.ffi_lib.serialqueue_send(self.serialqueue, cmd_queue,
+ cmd, len(cmd), minclock, reqclock, 0)
+ def raw_send_wait_ack(self, cmd, minclock, reqclock, cmd_queue):
+ self.last_notify_id += 1
+ nid = self.last_notify_id
+ completion = self.reactor.completion()
+ self.pending_notifications[nid] = completion
+ self.ffi_lib.serialqueue_send(self.serialqueue, cmd_queue,
+ cmd, len(cmd), minclock, reqclock, nid)
+ params = completion.wait()
+ if params is None:
+ raise error("Serial connection closed")
+ return params
def send(self, msg, minclock=0, reqclock=0):
cmd = self.msgparser.create_command(msg)
self.raw_send(cmd, minclock, reqclock, self.default_cmd_queue)