1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
|
// Trigger sync "trsync" message dispatch
//
// Copyright (C) 2021 Kevin O'Connor <kevin@koconnor.net>
//
// This file may be distributed under the terms of the GNU GPLv3 license.
#include <pthread.h> // pthread_mutex_lock
#include <stddef.h> // offsetof
#include <stdlib.h> // malloc
#include <string.h> // memset
#include "compiler.h" // ARRAY_SIZE
#include "list.h" // list_add_tail
#include "pollreactor.h" // PR_NEVER
#include "pyhelper.h" // report_errno
#include "serialqueue.h" // serialqueue_add_fastreader
struct trdispatch {
struct list_head tdm_list;
pthread_mutex_t lock; // protects variables below
uint32_t is_active, can_trigger, dispatch_reason;
};
struct trdispatch_mcu {
struct fastreader fr;
struct trdispatch *td;
struct list_node node;
struct serialqueue *sq;
struct command_queue *cq;
uint32_t trsync_oid, set_timeout_msgtag, trigger_msgtag;
// Remaining fields protected by trdispatch lock
uint64_t last_status_clock, expire_clock;
uint64_t expire_ticks, min_extend_ticks;
struct clock_estimate ce;
};
// Send: trsync_trigger oid=%c reason=%c
static void
send_trsync_trigger(struct trdispatch_mcu *tdm)
{
uint32_t msg[3] = {
tdm->trigger_msgtag, tdm->trsync_oid, tdm->td->dispatch_reason
};
struct queue_message *qm = message_alloc_and_encode(msg, ARRAY_SIZE(msg));
serialqueue_send_one(tdm->sq, tdm->cq, qm);
}
// Send: trsync_set_timeout oid=%c clock=%u
static void
send_trsync_set_timeout(struct trdispatch_mcu *tdm)
{
uint32_t msg[3] = {
tdm->set_timeout_msgtag, tdm->trsync_oid, tdm->expire_clock
};
struct queue_message *qm = message_alloc_and_encode(msg, ARRAY_SIZE(msg));
qm->req_clock = tdm->expire_clock;
serialqueue_send_one(tdm->sq, tdm->cq, qm);
}
// Handle a trsync_state message (callback from serialqueue fastreader)
static void
handle_trsync_state(struct fastreader *fr, uint8_t *data, int len)
{
struct trdispatch_mcu *tdm = container_of(fr, struct trdispatch_mcu, fr);
// Parse: trsync_state oid=%c can_trigger=%c trigger_reason=%c clock=%u
uint32_t fields[5];
int ret = msgblock_decode(fields, ARRAY_SIZE(fields), data, len);
if (ret || fields[1] != tdm->trsync_oid)
return;
uint32_t can_trigger=fields[2], clock=fields[4];
// Process message
struct trdispatch *td = tdm->td;
pthread_mutex_lock(&td->lock);
if (!td->can_trigger)
goto done;
if (!can_trigger) {
// mcu reports trigger or timeout - propagate to all mcus
td->can_trigger = 0;
struct trdispatch_mcu *m;
list_for_each_entry(m, &td->tdm_list, node) {
send_trsync_trigger(m);
}
goto done;
}
// mcu is still working okay - update last_status_clock
serialqueue_get_clock_est(tdm->sq, &tdm->ce);
tdm->last_status_clock = clock_from_clock32(&tdm->ce, clock);
// Determine minimum acknowledged time among all mcus
double min_time = PR_NEVER, next_min_time = PR_NEVER;
struct trdispatch_mcu *m, *min_tdm = NULL;
list_for_each_entry(m, &td->tdm_list, node) {
double status_time = clock_to_time(&m->ce, m->last_status_clock);
if (status_time < next_min_time) {
next_min_time = status_time;
if (status_time < min_time) {
next_min_time = min_time;
min_time = status_time;
min_tdm = m;
}
}
}
if (next_min_time == PR_NEVER)
next_min_time = min_time;
// Send trsync_set_timeout messages to other mcus (if needed)
list_for_each_entry(m, &td->tdm_list, node) {
double status_time = m == min_tdm ? next_min_time : min_time;
uint64_t expire=clock_from_time(&m->ce, status_time) + m->expire_ticks;
if ((int64_t)(expire - m->expire_clock) >= m->min_extend_ticks) {
m->expire_clock = expire;
send_trsync_set_timeout(m);
}
}
done:
pthread_mutex_unlock(&td->lock);
}
// Begin synchronization
void __visible
trdispatch_start(struct trdispatch *td, uint32_t dispatch_reason)
{
pthread_mutex_lock(&td->lock);
if (td->is_active || list_empty(&td->tdm_list)) {
pthread_mutex_unlock(&td->lock);
return;
}
td->dispatch_reason = dispatch_reason;
td->is_active = td->can_trigger = 1;
pthread_mutex_unlock(&td->lock);
// Register handle_trsync_state message parser for each mcu
struct trdispatch_mcu *tdm;
list_for_each_entry(tdm, &td->tdm_list, node) {
serialqueue_add_fastreader(tdm->sq, &tdm->fr);
}
}
// Cleanup after a test completes
void __visible
trdispatch_stop(struct trdispatch *td)
{
pthread_mutex_lock(&td->lock);
if (!td->is_active) {
pthread_mutex_unlock(&td->lock);
return;
}
td->is_active = 0;
pthread_mutex_unlock(&td->lock);
// Unregister handle_trsync_state message parsers
struct trdispatch_mcu *tdm;
list_for_each_entry(tdm, &td->tdm_list, node) {
serialqueue_rm_fastreader(tdm->sq, &tdm->fr);
}
}
// Create a new 'struct trdispatch' object
struct trdispatch * __visible
trdispatch_alloc(void)
{
struct trdispatch *td = malloc(sizeof(*td));
memset(td, 0, sizeof(*td));
list_init(&td->tdm_list);
int ret = pthread_mutex_init(&td->lock, NULL);
if (ret) {
report_errno("trdispatch_alloc pthread_mutex_init", ret);
return NULL;
}
return td;
}
// Create a new 'struct trdispatch_mcu' object
struct trdispatch_mcu * __visible
trdispatch_mcu_alloc(struct trdispatch *td, struct serialqueue *sq
, struct command_queue *cq, uint32_t trsync_oid
, uint32_t set_timeout_msgtag, uint32_t trigger_msgtag
, uint32_t state_msgtag)
{
struct trdispatch_mcu *tdm = malloc(sizeof(*tdm));
memset(tdm, 0, sizeof(*tdm));
tdm->sq = sq;
tdm->cq = cq;
tdm->trsync_oid = trsync_oid;
tdm->set_timeout_msgtag = set_timeout_msgtag;
tdm->trigger_msgtag = trigger_msgtag;
// Setup fastreader to match trsync_state messages
uint32_t state_prefix[] = {state_msgtag, trsync_oid};
struct queue_message *dummy = message_alloc_and_encode(
state_prefix, ARRAY_SIZE(state_prefix));
memcpy(tdm->fr.prefix, dummy->msg, dummy->len);
tdm->fr.prefix_len = dummy->len;
free(dummy);
tdm->fr.func = handle_trsync_state;
tdm->td = td;
list_add_tail(&tdm->node, &td->tdm_list);
return tdm;
}
// Setup for a trigger test
void __visible
trdispatch_mcu_setup(struct trdispatch_mcu *tdm
, uint64_t last_status_clock, uint64_t expire_clock
, uint64_t expire_ticks, uint64_t min_extend_ticks)
{
struct trdispatch *td = tdm->td;
pthread_mutex_lock(&td->lock);
tdm->last_status_clock = last_status_clock;
tdm->expire_clock = expire_clock;
tdm->expire_ticks = expire_ticks;
tdm->min_extend_ticks = min_extend_ticks;
serialqueue_get_clock_est(tdm->sq, &tdm->ce);
pthread_mutex_unlock(&td->lock);
}
|