Lely core libraries 2.3.4
can_rt.c
Go to the documentation of this file.
1
24#include "io2.h"
25
26#if !LELY_NO_MALLOC
27
28#if !LELY_NO_THREADS
29#include <lely/libc/threads.h>
30#endif
31#include <lely/ev/exec.h>
32#include <lely/ev/strand.h>
33#include <lely/io2/can_rt.h>
34#include <lely/io2/ctx.h>
35#include <lely/util/errnum.h>
36#include <lely/util/util.h>
37
38#include <assert.h>
39#include <stdlib.h>
40
41static io_ctx_t *io_can_rt_dev_get_ctx(const io_dev_t *dev);
42static ev_exec_t *io_can_rt_dev_get_exec(const io_dev_t *dev);
43static size_t io_can_rt_dev_cancel(io_dev_t *dev, struct ev_task *task);
44static size_t io_can_rt_dev_abort(io_dev_t *dev, struct ev_task *task);
45
46// clang-format off
47static const struct io_dev_vtbl io_can_rt_dev_vtbl = {
48 &io_can_rt_dev_get_ctx,
49 &io_can_rt_dev_get_exec,
50 &io_can_rt_dev_cancel,
51 &io_can_rt_dev_abort
52};
53// clang-format on
54
55static void io_can_rt_svc_shutdown(struct io_svc *svc);
56
57// clang-format off
58static const struct io_svc_vtbl io_can_rt_svc_vtbl = {
59 NULL,
60 &io_can_rt_svc_shutdown
61};
62// clang-format on
63
64struct io_can_rt {
65 const struct io_dev_vtbl *dev_vptr;
66 struct io_svc svc;
67 io_ctx_t *ctx;
68 io_can_chan_t *chan;
69 ev_exec_t *exec;
70 struct can_msg msg;
71 struct can_err err;
72 struct io_can_chan_read read;
73 struct ev_task task;
74#if !LELY_NO_THREADS
75 mtx_t mtx;
76#endif
77 unsigned shutdown : 1;
78 unsigned submitted : 1;
79 struct rbtree msg_queue;
80 struct sllist err_queue;
81};
82
83static void io_can_rt_read_func(struct ev_task *task);
84static void io_can_rt_task_func(struct ev_task *task);
85
86static inline io_can_rt_t *io_can_rt_from_dev(const io_dev_t *dev);
87static inline io_can_rt_t *io_can_rt_from_svc(const struct io_svc *svc);
88
89static void io_can_rt_do_pop(io_can_rt_t *rt, struct sllist *msg_queue,
90 struct sllist *err_queue, struct ev_task *task);
91static void io_can_rt_do_pop_read_msg(io_can_rt_t *rt, struct sllist *queue,
92 struct io_can_rt_read_msg *read_msg);
93static void io_can_rt_do_pop_read_err(io_can_rt_t *rt, struct sllist *queue,
94 struct io_can_rt_read_err *read_err);
95
96static void io_can_rt_read_msg_post(struct io_can_rt_read_msg *read_msg,
97 const struct can_msg *msg, int errc);
98static size_t io_can_rt_read_msg_queue_post(
99 struct sllist *queue, const struct can_msg *msg, int errc);
100
101static void io_can_rt_read_err_post(struct io_can_rt_read_err *read_err,
102 const struct can_err *err, int errc);
103static size_t io_can_rt_read_err_queue_post(
104 struct sllist *queue, const struct can_err *err, int errc);
105
106static int io_can_rt_read_msg_cmp(const void *p1, const void *p2);
107
109 ev_promise_t *promise;
110 struct io_can_rt_read_msg read_msg;
111 struct can_msg msg;
112};
113
114static void io_can_rt_async_read_msg_func(struct ev_task *task);
115
117 ev_promise_t *promise;
118 struct io_can_rt_read_err read_err;
119 struct can_err err;
120};
121
122static void io_can_rt_async_read_err_func(struct ev_task *task);
123
125 ev_promise_t *promise;
126 struct ev_task task;
127};
128
129static void io_can_rt_async_shutdown_func(struct ev_task *task);
130
131void *
132io_can_rt_alloc(void)
133{
134 void *ptr = malloc(sizeof(io_can_rt_t));
135#if !LELY_NO_ERRNO
136 if (!ptr)
137 set_errc(errno2c(errno));
138#endif
139 return ptr;
140}
141
142void
143io_can_rt_free(void *ptr)
144{
145 free(ptr);
146}
147
149io_can_rt_init(io_can_rt_t *rt, io_can_chan_t *chan, ev_exec_t *exec)
150{
151 assert(rt);
152 assert(chan);
153 io_dev_t *dev = io_can_chan_get_dev(chan);
154 assert(dev);
155 io_ctx_t *ctx = io_dev_get_ctx(dev);
156 assert(ctx);
157 if (!exec)
158 exec = io_dev_get_exec(dev);
159 assert(exec);
160
161 int errc = 0;
162
163 rt->dev_vptr = &io_can_rt_dev_vtbl;
164
165 rt->svc = (struct io_svc)IO_SVC_INIT(&io_can_rt_svc_vtbl);
166 rt->ctx = ctx;
167
168 rt->chan = chan;
169
170 rt->exec = ev_strand_create(exec);
171 if (!rt->exec) {
172 errc = get_errc();
173 goto error_create_strand;
174 }
175
176 rt->msg = (struct can_msg)CAN_MSG_INIT;
177 rt->err = (struct can_err)CAN_ERR_INIT;
178 rt->read = (struct io_can_chan_read)IO_CAN_CHAN_READ_INIT(&rt->msg,
179 &rt->err, NULL, rt->exec, &io_can_rt_read_func);
180
181 rt->task = (struct ev_task)EV_TASK_INIT(rt->exec, &io_can_rt_task_func);
182
183#if !LELY_NO_THREADS
184 if (mtx_init(&rt->mtx, mtx_plain) != thrd_success) {
185 errc = get_errc();
186 goto error_init_mtx;
187 }
188#endif
189
190 rt->shutdown = 0;
191 rt->submitted = 0;
192
193 rbtree_init(&rt->msg_queue, &io_can_rt_read_msg_cmp);
194 sllist_init(&rt->err_queue);
195
196 io_ctx_insert(rt->ctx, &rt->svc);
197
198 return rt;
199
200#if !LELY_NO_THREADS
201 // mtx_destroy(&rt->mtx);
202error_init_mtx:
203#endif
204 ev_strand_destroy(rt->exec);
205error_create_strand:
206 set_errc(errc);
207 return NULL;
208}
209
210void
211io_can_rt_fini(io_can_rt_t *rt)
212{
213 assert(rt);
214
215 io_ctx_remove(rt->ctx, &rt->svc);
216
217#if !LELY_NO_THREADS
218 mtx_destroy(&rt->mtx);
219#endif
220
221 ev_strand_destroy(rt->exec);
222}
223
226{
227 int errc = 0;
228
229 io_can_rt_t *rt = io_can_rt_alloc();
230 if (!rt) {
231 errc = get_errc();
232 goto error_alloc;
233 }
234
235 io_can_rt_t *tmp = io_can_rt_init(rt, chan, exec);
236 if (!tmp) {
237 errc = get_errc();
238 goto error_init;
239 }
240 rt = tmp;
241
242 return rt;
243
244error_init:
245 io_can_rt_free(rt);
246error_alloc:
247 set_errc(errc);
248 return NULL;
249}
250
251void
253{
254 if (rt) {
255 io_can_rt_fini(rt);
256 io_can_rt_free(rt);
257 }
258}
259
260io_dev_t *
262{
263 assert(rt);
264
265 return &rt->dev_vptr;
266}
267
270{
271 assert(rt);
272
273 return rt->chan;
274}
275
276void
278{
279 assert(rt);
280 assert(read_msg);
281 struct ev_task *task = &read_msg->task;
282
283 task->exec = rt->exec;
285
286#if !LELY_NO_THREADS
287 mtx_lock(&rt->mtx);
288#endif
289 if (rt->shutdown) {
290#if !LELY_NO_THREADS
291 mtx_unlock(&rt->mtx);
292#endif
293 io_can_rt_read_msg_post(
294 read_msg, NULL, errnum2c(ERRNUM_CANCELED));
295 } else {
296 task->_data = &rt->msg_queue;
297 struct rbnode *node = rbtree_find(&rt->msg_queue, read_msg);
298 if (node) {
299 read_msg = structof(
300 node, struct io_can_rt_read_msg, _node);
301 sllist_push_back(&read_msg->_queue, &task->_node);
302 } else {
303 rbnode_init(&read_msg->_node, read_msg);
304 sllist_init(&read_msg->_queue);
305 rbtree_insert(&rt->msg_queue, &read_msg->_node);
306 }
307 int submit = !rt->submitted;
308 if (submit)
309 rt->submitted = 1;
310#if !LELY_NO_THREADS
311 mtx_unlock(&rt->mtx);
312#endif
313 // cppcheck-suppress duplicateCondition
314 if (submit)
315 io_can_chan_submit_read(rt->chan, &rt->read);
316 }
317}
318
319size_t
321{
322 assert(rt);
323 assert(read_msg);
324 struct ev_task *task = &read_msg->task;
325
326 struct sllist queue;
327 sllist_init(&queue);
328
329#if !LELY_NO_THREADS
330 mtx_lock(&rt->mtx);
331#endif
332 if (task->_data == &rt->msg_queue)
333 io_can_rt_do_pop_read_msg(rt, &queue, read_msg);
334#if !LELY_NO_THREADS
335 mtx_unlock(&rt->mtx);
336#endif
337
338 return io_can_rt_read_msg_queue_post(
339 &queue, NULL, errnum2c(ERRNUM_CANCELED));
340}
341
342size_t
344{
345 assert(rt);
346 assert(read_msg);
347 struct ev_task *task = &read_msg->task;
348
349 struct sllist queue;
350 sllist_init(&queue);
351
352#if !LELY_NO_THREADS
353 mtx_lock(&rt->mtx);
354#endif
355 if (task->_data == &rt->msg_queue)
356 io_can_rt_do_pop_read_msg(rt, &queue, read_msg);
357#if !LELY_NO_THREADS
358 mtx_unlock(&rt->mtx);
359#endif
360
361 return ev_task_queue_abort(&queue);
362}
363
366 uint_least8_t flags, struct io_can_rt_read_msg **pread_msg)
367{
369 sizeof(struct io_can_rt_async_read_msg), NULL);
370 if (!promise)
371 return NULL;
372 ev_future_t *future = ev_promise_get_future(promise);
373
374 struct io_can_rt_async_read_msg *async_read_msg =
375 ev_promise_data(promise);
376 async_read_msg->promise = promise;
377 async_read_msg->read_msg =
379 flags, &io_can_rt_async_read_msg_func);
380 async_read_msg->msg = (struct can_msg)CAN_MSG_INIT;
381
382 io_can_rt_submit_read_msg(rt, &async_read_msg->read_msg);
383
384 if (pread_msg)
385 *pread_msg = &async_read_msg->read_msg;
386
387 return future;
388}
389
390void
392{
393 assert(rt);
394 assert(read_err);
395 struct ev_task *task = &read_err->task;
396
397 task->exec = rt->exec;
399
400#if !LELY_NO_THREADS
401 mtx_lock(&rt->mtx);
402#endif
403 if (rt->shutdown) {
404#if !LELY_NO_THREADS
405 mtx_unlock(&rt->mtx);
406#endif
407 io_can_rt_read_err_post(
408 read_err, NULL, errnum2c(ERRNUM_CANCELED));
409 } else {
410 task->_data = &rt->err_queue;
411 sllist_push_back(&rt->err_queue, &task->_node);
412 int submit = !rt->submitted;
413 if (submit)
414 rt->submitted = 1;
415#if !LELY_NO_THREADS
416 mtx_unlock(&rt->mtx);
417#endif
418 // cppcheck-suppress duplicateCondition
419 if (submit)
420 io_can_chan_submit_read(rt->chan, &rt->read);
421 }
422}
423
424size_t
426{
427 assert(rt);
428 assert(read_err);
429 struct ev_task *task = &read_err->task;
430
431 struct sllist queue;
432 sllist_init(&queue);
433
434#if !LELY_NO_THREADS
435 mtx_lock(&rt->mtx);
436#endif
437 if (task->_data == &rt->err_queue)
438 io_can_rt_do_pop_read_err(rt, &queue, read_err);
439#if !LELY_NO_THREADS
440 mtx_unlock(&rt->mtx);
441#endif
442
443 return io_can_rt_read_err_queue_post(
444 &queue, NULL, errnum2c(ERRNUM_CANCELED));
445}
446
447size_t
449{
450 assert(rt);
451 assert(read_err);
452 struct ev_task *task = &read_err->task;
453
454 struct sllist queue;
455 sllist_init(&queue);
456
457#if !LELY_NO_THREADS
458 mtx_lock(&rt->mtx);
459#endif
460 if (task->_data == &rt->err_queue)
461 io_can_rt_do_pop_read_err(rt, &queue, read_err);
462#if !LELY_NO_THREADS
463 mtx_unlock(&rt->mtx);
464#endif
465
466 return ev_task_queue_abort(&queue);
467}
468
471{
473 sizeof(struct io_can_rt_async_read_err), NULL);
474 if (!promise)
475 return NULL;
476 ev_future_t *future = ev_promise_get_future(promise);
477
478 struct io_can_rt_async_read_err *async_read_err =
479 ev_promise_data(promise);
480 async_read_err->promise = promise;
481 async_read_err->read_err =
483 &io_can_rt_async_read_err_func);
484 async_read_err->err = (struct can_err)CAN_ERR_INIT;
485
486 io_can_rt_submit_read_err(rt, &async_read_err->read_err);
487
488 if (pread_err)
489 *pread_err = &async_read_err->read_err;
490
491 return future;
492}
493
496{
498 sizeof(struct io_can_rt_async_shutdown), NULL);
499 if (!promise)
500 return NULL;
501 ev_future_t *future = ev_promise_get_future(promise);
502
503 struct io_can_rt_async_shutdown *async_shutdown =
504 ev_promise_data(promise);
505 async_shutdown->promise = promise;
506 async_shutdown->task = (struct ev_task)EV_TASK_INIT(
507 rt->exec, &io_can_rt_async_shutdown_func);
508
509 io_can_rt_svc_shutdown(&rt->svc);
510 ev_exec_post(async_shutdown->task.exec, &async_shutdown->task);
511
512 return future;
513}
514
515struct io_can_rt_read_msg *
517{
518 return task ? structof(task, struct io_can_rt_read_msg, task) : NULL;
519}
520
521struct io_can_rt_read_err *
523{
524 return task ? structof(task, struct io_can_rt_read_err, task) : NULL;
525}
526
527static io_ctx_t *
528io_can_rt_dev_get_ctx(const io_dev_t *dev)
529{
530 const io_can_rt_t *rt = io_can_rt_from_dev(dev);
531
532 return rt->ctx;
533}
534
535static ev_exec_t *
536io_can_rt_dev_get_exec(const io_dev_t *dev)
537{
538 const io_can_rt_t *rt = io_can_rt_from_dev(dev);
539
540 return rt->exec;
541}
542
543static size_t
544io_can_rt_dev_cancel(io_dev_t *dev, struct ev_task *task)
545{
546 io_can_rt_t *rt = io_can_rt_from_dev(dev);
547
548 struct sllist msg_queue, err_queue;
549 sllist_init(&msg_queue);
550 sllist_init(&err_queue);
551
552#if !LELY_NO_THREADS
553 mtx_lock(&rt->mtx);
554#endif
555 io_can_rt_do_pop(rt, &msg_queue, &err_queue, task);
556#if !LELY_NO_THREADS
557 mtx_unlock(&rt->mtx);
558#endif
559 size_t nmsg = io_can_rt_read_msg_queue_post(
560 &msg_queue, NULL, errnum2c(ERRNUM_CANCELED));
561 size_t nerr = io_can_rt_read_err_queue_post(
562 &err_queue, NULL, errnum2c(ERRNUM_CANCELED));
563 return nerr < SIZE_MAX - nmsg ? nmsg + nerr : SIZE_MAX;
564}
565
566static size_t
567io_can_rt_dev_abort(io_dev_t *dev, struct ev_task *task)
568{
569 io_can_rt_t *rt = io_can_rt_from_dev(dev);
570
571 struct sllist queue;
572 sllist_init(&queue);
573
574#if !LELY_NO_THREADS
575 mtx_lock(&rt->mtx);
576#endif
577 io_can_rt_do_pop(rt, &queue, &queue, task);
578#if !LELY_NO_THREADS
579 mtx_unlock(&rt->mtx);
580#endif
581 return ev_task_queue_abort(&queue);
582}
583
584static void
585io_can_rt_svc_shutdown(struct io_svc *svc)
586{
587 io_can_rt_t *rt = io_can_rt_from_svc(svc);
588 io_dev_t *dev = &rt->dev_vptr;
589
590#if !LELY_NO_THREADS
591 mtx_lock(&rt->mtx);
592#endif
593 int shutdown = !rt->shutdown;
594 rt->shutdown = 1;
595 // Abort io_can_rt_read_func().
596 if (shutdown && rt->submitted
597 && io_can_chan_abort_read(rt->chan, &rt->read))
598 rt->submitted = 0;
599#if !LELY_NO_THREADS
600 mtx_unlock(&rt->mtx);
601#endif
602
603 if (shutdown)
604 // Cancel all pending operations.
605 io_can_rt_dev_cancel(dev, NULL);
606}
607
608static void
609io_can_rt_read_func(struct ev_task *task)
610{
611 assert(task);
613 io_can_rt_t *rt = structof(read, io_can_rt_t, read);
614
615 if (rt->read.r.result > 0) {
616 struct sllist queue;
617 sllist_init(&queue);
618 struct io_can_rt_read_msg key;
619 key.id = rt->read.msg->id;
620 key.flags = rt->read.msg->flags;
621#if !LELY_NO_THREADS
622 mtx_lock(&rt->mtx);
623#endif
624 struct rbnode *node = rbtree_find(&rt->msg_queue, &key);
625 if (node) {
626 rbtree_remove(&rt->msg_queue, node);
627 struct io_can_rt_read_msg *read_msg = structof(
628 node, struct io_can_rt_read_msg, _node);
629 sllist_push_back(&queue, &read_msg->task._node);
630 sllist_append(&queue, &read_msg->_queue);
631 }
632#if !LELY_NO_THREADS
633 mtx_unlock(&rt->mtx);
634#endif
635 io_can_rt_read_msg_queue_post(&queue, rt->read.msg, 0);
636 } else if (!rt->read.r.result) {
637 struct sllist queue;
638 sllist_init(&queue);
639#if !LELY_NO_THREADS
640 mtx_lock(&rt->mtx);
641#endif
642 sllist_append(&queue, &rt->err_queue);
643#if !LELY_NO_THREADS
644 mtx_unlock(&rt->mtx);
645#endif
646 io_can_rt_read_err_queue_post(&queue, rt->read.err, 0);
647 } else if (rt->read.r.errc) {
648 struct sllist msg_queue, err_queue;
649 sllist_init(&msg_queue);
650 sllist_init(&err_queue);
651#if !LELY_NO_THREADS
652 mtx_lock(&rt->mtx);
653#endif
654 io_can_rt_do_pop(rt, &msg_queue, &err_queue, NULL);
655#if !LELY_NO_THREADS
656 mtx_unlock(&rt->mtx);
657#endif
658 io_can_rt_read_msg_queue_post(
659 &msg_queue, NULL, rt->read.r.errc);
660 io_can_rt_read_err_queue_post(
661 &err_queue, NULL, rt->read.r.errc);
662 }
663
664 ev_exec_post(rt->task.exec, &rt->task);
665}
666
667static void
668io_can_rt_task_func(struct ev_task *task)
669{
670 assert(task);
671 io_can_rt_t *rt = structof(task, io_can_rt_t, task);
672
673#if !LELY_NO_THREADS
674 mtx_lock(&rt->mtx);
675#endif
676 assert(rt->submitted);
677 int submit = rt->submitted =
678 (!rbtree_empty(&rt->msg_queue)
679 || !sllist_empty(&rt->err_queue))
680 && !rt->shutdown;
681#if !LELY_NO_THREADS
682 mtx_unlock(&rt->mtx);
683#endif
684
685 if (submit)
686 io_can_chan_submit_read(rt->chan, &rt->read);
687}
688
689static inline io_can_rt_t *
690io_can_rt_from_dev(const io_dev_t *dev)
691{
692 assert(dev);
693
694 return structof(dev, io_can_rt_t, dev_vptr);
695}
696
697static inline io_can_rt_t *
698io_can_rt_from_svc(const struct io_svc *svc)
699{
700 assert(svc);
701
702 return structof(svc, io_can_rt_t, svc);
703}
704
705static void
706io_can_rt_do_pop(io_can_rt_t *rt, struct sllist *msg_queue,
707 struct sllist *err_queue, struct ev_task *task)
708{
709 assert(rt);
710 assert(msg_queue);
711 assert(err_queue);
712
713 if (!task) {
714 rbtree_foreach (&rt->msg_queue, node) {
715 rbtree_remove(&rt->msg_queue, node);
716 struct io_can_rt_read_msg *read_msg = structof(
717 node, struct io_can_rt_read_msg, _node);
718 sllist_push_back(msg_queue, &read_msg->task._node);
719 sllist_append(msg_queue, &read_msg->_queue);
720 }
721 sllist_append(err_queue, &rt->err_queue);
722 if (rt->submitted)
723 io_can_chan_cancel_read(rt->chan, &rt->read);
724 } else if (task->_data == &rt->msg_queue) {
725 io_can_rt_do_pop_read_msg(rt, msg_queue,
727 } else if (task->_data == &rt->err_queue) {
728 io_can_rt_do_pop_read_err(rt, err_queue,
730 }
731}
732
733static void
734io_can_rt_do_pop_read_msg(io_can_rt_t *rt, struct sllist *queue,
735 struct io_can_rt_read_msg *read_msg)
736{
737 assert(rt);
738 assert(queue);
739 assert(read_msg);
740 struct ev_task *task = &read_msg->task;
741 assert(task->_data == &rt->msg_queue);
742
743 struct rbnode *node = rbtree_find(&rt->msg_queue, read_msg);
744 if (node == &read_msg->_node) {
745 rbtree_remove(&rt->msg_queue, &read_msg->_node);
746 if (!sllist_empty(&read_msg->_queue)) {
747 struct sllist queue = read_msg->_queue;
750 &queue)));
751 rbnode_init(&read_msg->_node, read_msg);
752 sllist_init(&read_msg->_queue);
753 sllist_append(&read_msg->_queue, &queue);
754 rbtree_insert(&rt->msg_queue, &read_msg->_node);
755 } else if (rt->submitted && rbtree_empty(&rt->msg_queue)
756 && sllist_empty(&rt->err_queue)) {
757 io_can_chan_cancel_read(rt->chan, &rt->read);
758 }
759 task->_data = NULL;
760 sllist_push_back(queue, &task->_node);
761 } else if (node) {
762 read_msg = structof(node, struct io_can_rt_read_msg, _node);
763 if (sllist_remove(&read_msg->_queue, &task->_node)) {
764 task->_data = NULL;
765 sllist_push_back(queue, &task->_node);
766 }
767 }
768}
769
770static void
771io_can_rt_do_pop_read_err(io_can_rt_t *rt, struct sllist *queue,
772 struct io_can_rt_read_err *read_err)
773{
774 assert(rt);
775 assert(queue);
776 assert(read_err);
777 struct ev_task *task = &read_err->task;
778 assert(task->_data == &rt->err_queue);
779
780 if (sllist_remove(&rt->err_queue, &task->_node)) {
781 if (rt->submitted && rbtree_empty(&rt->msg_queue)
782 && sllist_empty(&rt->err_queue))
783 io_can_chan_cancel_read(rt->chan, &rt->read);
784 task->_data = NULL;
785 sllist_push_back(queue, &task->_node);
786 }
787}
788
789static void
790io_can_rt_read_msg_post(struct io_can_rt_read_msg *read_msg,
791 const struct can_msg *msg, int errc)
792{
793 read_msg->r.msg = msg;
794 read_msg->r.errc = errc;
795
796 ev_exec_t *exec = read_msg->task.exec;
797 ev_exec_post(exec, &read_msg->task);
799}
800
801static size_t
802io_can_rt_read_msg_queue_post(
803 struct sllist *queue, const struct can_msg *msg, int errc)
804{
805 size_t n = 0;
806
807 struct slnode *node;
808 while ((node = sllist_pop_front(queue))) {
809 struct ev_task *task = ev_task_from_node(node);
810 struct io_can_rt_read_msg *read_msg =
812 io_can_rt_read_msg_post(read_msg, msg, errc);
813 n += n < SIZE_MAX;
814 }
815
816 return n;
817}
818
819static void
820io_can_rt_read_err_post(struct io_can_rt_read_err *read_err,
821 const struct can_err *err, int errc)
822{
823 read_err->r.err = err;
824 read_err->r.errc = errc;
825
826 ev_exec_t *exec = read_err->task.exec;
827 ev_exec_post(exec, &read_err->task);
829}
830
831static size_t
832io_can_rt_read_err_queue_post(
833 struct sllist *queue, const struct can_err *err, int errc)
834{
835 size_t n = 0;
836
837 struct slnode *node;
838 while ((node = sllist_pop_front(queue))) {
839 struct ev_task *task = ev_task_from_node(node);
840 struct io_can_rt_read_err *read_err =
842 io_can_rt_read_err_post(read_err, err, errc);
843 n += n < SIZE_MAX;
844 }
845
846 return n;
847}
848
849static int
850io_can_rt_read_msg_cmp(const void *p1, const void *p2)
851{
852 const struct io_can_rt_read_msg *r1 = p1;
853 assert(r1);
854 const struct io_can_rt_read_msg *r2 = p2;
855 assert(r2);
856
857 int cmp = (r2->id < r1->id) - (r1->id < r2->id);
858 if (!cmp)
859 cmp = (r2->flags < r1->flags) - (r1->flags < r2->flags);
860 return cmp;
861}
862
863static void
864io_can_rt_async_read_msg_func(struct ev_task *task)
865{
866 assert(task);
867 struct io_can_rt_read_msg *read_msg =
869 struct io_can_rt_async_read_msg *async_read_msg = structof(
870 read_msg, struct io_can_rt_async_read_msg, read_msg);
871
872 if (ev_promise_set_acquire(async_read_msg->promise)) {
873 if (read_msg->r.msg) {
874 async_read_msg->msg = *read_msg->r.msg;
875 read_msg->r.msg = &async_read_msg->msg;
876 }
877 ev_promise_set_release(async_read_msg->promise, &read_msg->r);
878 }
879 ev_promise_release(async_read_msg->promise);
880}
881
882static void
883io_can_rt_async_read_err_func(struct ev_task *task)
884{
885 assert(task);
886 struct io_can_rt_read_err *read_err =
888 struct io_can_rt_async_read_err *async_read_err = structof(
889 read_err, struct io_can_rt_async_read_err, read_err);
890
891 if (ev_promise_set_acquire(async_read_err->promise)) {
892 if (read_err->r.err) {
893 async_read_err->err = *read_err->r.err;
894 read_err->r.err = &async_read_err->err;
895 }
896 ev_promise_set_release(async_read_err->promise, &read_err->r);
897 }
898 ev_promise_release(async_read_err->promise);
899}
900
901static void
902io_can_rt_async_shutdown_func(struct ev_task *task)
903{
904 assert(task);
905 struct io_can_rt_async_shutdown *async_shutdown =
906 structof(task, struct io_can_rt_async_shutdown, task);
907
908 ev_promise_set(async_shutdown->promise, NULL);
909 ev_promise_release(async_shutdown->promise);
910}
911
912#endif // !LELY_NO_MALLOC
#define CAN_MSG_INIT
The static initializer for can_msg.
Definition msg.h:113
io_can_rt_t * io_can_rt_create(io_can_chan_t *chan, ev_exec_t *exec)
Creates a new CAN frame router.
Definition can_rt.c:225
size_t io_can_rt_cancel_read_err(io_can_rt_t *rt, struct io_can_rt_read_err *read_err)
Cancels the specified CAN error frame read operation if it is pending.
Definition can_rt.c:425
struct io_can_rt_read_err * io_can_rt_read_err_from_task(struct ev_task *task)
Obtains a pointer to a CAN error frame read operation from a pointer to its completion task.
Definition can_rt.c:522
void io_can_rt_submit_read_err(io_can_rt_t *rt, struct io_can_rt_read_err *read_err)
Submits a CAN error frame read operation to a CAN frame router.
Definition can_rt.c:391
io_dev_t * io_can_rt_get_dev(const io_can_rt_t *rt)
Returns a pointer to the abstract I/O device representing the CAN frame router.
Definition can_rt.c:261
io_can_chan_t * io_can_rt_get_chan(const io_can_rt_t *rt)
Returns a pointer to the CAN channel used by the CAN frame router.
Definition can_rt.c:269
struct io_can_rt_read_msg * io_can_rt_read_msg_from_task(struct ev_task *task)
Obtains a pointer to a CAN frame read operation from a pointer to its completion task.
Definition can_rt.c:516
size_t io_can_rt_abort_read_err(io_can_rt_t *rt, struct io_can_rt_read_err *read_err)
Aborts the specified CAN error frame read operation if it is pending.
Definition can_rt.c:448
size_t io_can_rt_abort_read_msg(io_can_rt_t *rt, struct io_can_rt_read_msg *read_msg)
Aborts the specified CAN frame read operation if it is pending.
Definition can_rt.c:343
void io_can_rt_destroy(io_can_rt_t *rt)
Destroys a CAN frame router.
Definition can_rt.c:252
void io_can_rt_submit_read_msg(io_can_rt_t *rt, struct io_can_rt_read_msg *read_msg)
Submits a CAN frame read operation to a CAN frame router.
Definition can_rt.c:277
size_t io_can_rt_cancel_read_msg(io_can_rt_t *rt, struct io_can_rt_read_msg *read_msg)
Cancels the specified CAN frame read operation if it is pending.
Definition can_rt.c:320
This header file is part of the I/O library; it contains the CAN frame router declarations.
#define IO_CAN_RT_READ_MSG_INIT(id, flags, func)
The static initializer for io_can_rt_read_msg.
Definition can_rt.h:77
#define IO_CAN_RT_READ_ERR_INIT(func)
The static initializer for io_can_rt_read_err.
Definition can_rt.h:111
This header file is part of the I/O library; it contains the I/O context and service declarations.
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition ctx.c:126
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition ctx.c:141
This header file is part of the utilities library; it contains the native and platform-independent er...
int errnum2c(errnum_t errnum)
Transforms a platform-independent error number to a native error code.
Definition errnum.c:810
@ ERRNUM_CANCELED
Operation canceled.
Definition errnum.h:99
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition errnum.c:932
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition errnum.c:944
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition errnum.c:46
This header file is part of the event library; it contains the abstract task executor interface.
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition exec.h:124
void ev_exec_on_task_fini(ev_exec_t *exec)
Undoes the effect of a previous call to ev_exec_on_task_init().
Definition exec.h:112
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition exec.h:106
void ev_promise_release(ev_promise_t *promise)
Releases a reference to a promise.
Definition future.c:198
ev_future_t * ev_promise_get_future(ev_promise_t *promise)
Returns (a reference to) a future associated with the specified promise.
Definition future.c:312
int ev_promise_set(ev_promise_t *promise, void *value)
Satiesfies a promise, if it was not aready satisfied, and stores the specified value for retrieval by...
Definition future.c:242
void ev_promise_set_release(ev_promise_t *promise, void *value)
Satisfies a promise prepared by ev_promise_set_acquire(), and stores the specified value for retrieva...
Definition future.c:275
void * ev_promise_data(const ev_promise_t *promise)
Returns a pointer to the shared state of a promise.
Definition future.c:236
ev_promise_t * ev_promise_create(size_t size, ev_promise_dtor_t *dtor)
Constructs a new promise with an optional empty shared state.
Definition future.c:154
int ev_promise_set_acquire(ev_promise_t *promise)
Checks if the specified promise can be satisfied by the caller and, if so, prevents others from satis...
Definition future.c:251
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition ev.h:29
const struct io_can_chan_vtbl *const io_can_chan_t
An abstract CAN channel.
Definition can.h:59
struct io_can_chan_read * io_can_chan_read_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel read operation from a pointer to its completion task.
Definition can.c:93
static size_t io_can_chan_abort_read(io_can_chan_t *chan, struct io_can_chan_read *read)
Aborts the specified CAN channel read operation if it is pending.
Definition can.h:500
static size_t io_can_chan_cancel_read(io_can_chan_t *chan, struct io_can_chan_read *read)
Cancels the specified CAN channel read operation if it is pending.
Definition can.h:494
io_dev_t * io_can_chan_get_dev(const io_can_chan_t *chan)
Returns a pointer to the abstract I/O device representing the CAN channel.
Definition can.h:469
void io_can_chan_submit_read(io_can_chan_t *chan, struct io_can_chan_read *read)
Submits a read operation to a CAN channel.
Definition can.h:488
#define IO_CAN_CHAN_READ_INIT(msg, err, tp, exec, func)
The static initializer for io_can_chan_read.
Definition can.h:104
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition util.h:93
#define CAN_ERR_INIT
The static initializer for a can_err struct.
Definition err.h:43
const struct io_dev_vtbl *const io_dev_t
An abstract I/O device.
Definition dev.h:35
io_ctx_t * io_dev_get_ctx(const io_dev_t *dev)
Returns a pointer to the I/O context with which the I/O device is registered.
Definition dev.h:80
ev_exec_t * io_dev_get_exec(const io_dev_t *dev)
Returns a pointer to the executor used by the I/O device to execute asynchronous tasks.
Definition dev.h:86
void rbnode_init(struct rbnode *node, const void *key)
Initializes a node in a red-black tree.
Definition rbtree.h:237
void rbtree_insert(struct rbtree *tree, struct rbnode *node)
Inserts a node into a red-black tree.
Definition rbtree.c:108
struct rbnode * rbtree_find(const struct rbtree *tree, const void *key)
Finds a node in a red-black tree.
Definition rbtree.c:306
void rbtree_init(struct rbtree *tree, rbtree_cmp_t *cmp)
Initializes a red-black tree.
Definition rbtree.h:248
void rbtree_remove(struct rbtree *tree, struct rbnode *node)
Removes a node from a red-black tree.
Definition rbtree.c:187
int rbtree_empty(const struct rbtree *tree)
Returns 1 if the red-black tree is empty, and 0 if not.
Definition rbtree.h:259
#define rbtree_foreach(tree, node)
Iterates over each node in a red-black tree in ascending order.
Definition rbtree.h:234
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition sllist.h:194
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition sllist.h:257
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition sllist.h:232
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition sllist.h:202
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition sllist.h:243
This is the internal header file of the I/O library.
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
This header file is part of the event library; it contains the strand executor declarations.
void ev_strand_destroy(ev_exec_t *exec)
Destroys a strand executor.
Definition strand.c:180
ev_exec_t * ev_strand_create(ev_exec_t *inner_exec)
Creates a strand executor.
Definition strand.c:153
A CAN error frame.
Definition err.h:28
A CAN or CAN FD format frame.
Definition msg.h:87
uint_least32_t id
The identifier (11 or 29 bits, depending on the CAN_FLAG_IDE flag).
Definition msg.h:89
uint_least8_t flags
The flags (any combination of CAN_FLAG_IDE, CAN_FLAG_RTR, CAN_FLAG_FDF, CAN_FLAG_BRS and CAN_FLAG_ESI...
Definition msg.h:94
A future.
Definition future.c:66
A promise.
Definition future.c:95
An executable task.
Definition task.h:41
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition task.h:43
int result
The result of the read operation: 1 if a CAN frame is received, 0 if an error frame is received,...
Definition can.h:68
int errc
The error number, obtained as if by get_errc(), if result is -1.
Definition can.h:70
A CAN channel read operation.
Definition can.h:74
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition can.h:98
struct io_can_chan_read_result r
The result of the read operation.
Definition can.h:100
struct can_msg * msg
The address at which to store the CAN frame.
Definition can.h:80
struct can_err * err
The address at which to store the CAN error frame.
Definition can.h:86
const struct can_err * err
A pointer to the received CAN error frame, or NULL on error (or if the operation is canceled).
Definition can_rt.h:94
int errc
The error number, obtained as if by get_errc(), if err is NULL.
Definition can_rt.h:96
A CAN error frame read operation suitable for use with a CAN frame router.
Definition can_rt.h:100
struct io_can_rt_read_err_result r
The result of the read operation.
Definition can_rt.h:107
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition can_rt.h:105
int errc
The error number, obtained as if by get_errc(), if msg is NULL.
Definition can_rt.h:48
const struct can_msg * msg
A pointer to the received CAN frame, or NULL on error (or if the operation is canceled).
Definition can_rt.h:46
A CAN frame read operation suitable for use with a CAN frame router.
Definition can_rt.h:52
uint_least32_t id
The identifier of the CAN frame to be received.
Definition can_rt.h:57
struct io_can_rt_read_msg_result r
The result of the read operation.
Definition can_rt.h:71
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition can_rt.h:69
uint_least8_t flags
The flags of the CAN frame to be received (any combination of CAN_FLAG_IDE, CAN_FLAG_RTR,...
Definition can_rt.h:64
Definition ctx.c:38
The virtual table of an I/O service.
Definition ctx.h:67
An I/O service.
Definition ctx.h:49
A node in a red-black tree.
Definition rbtree.h:53
const void * key
A pointer to the key for this node.
Definition rbtree.h:59
A red-black tree.
Definition rbtree.h:91
A singly-linked list.
Definition sllist.h:52
A node in a singly-linked list.
Definition sllist.h:40
size_t ev_task_queue_abort(struct sllist *queue)
Aborts the tasks in queue by invoking ev_exec_on_task_fini() for each of them.
Definition task.c:55
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition task.h:53
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
@ thrd_success
Indicates that the requested operation succeeded.
Definition threads.h:121
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition threads.h:109