Lely core libraries 2.3.4
can.c
Go to the documentation of this file.
1
24#include "../can.h"
25
26#if !LELY_NO_MALLOC
27
28#if !LELY_NO_THREADS
29#include <lely/libc/threads.h>
30#endif
31#include <lely/io2/ctx.h>
32#include <lely/io2/user/can.h>
33#include <lely/util/diag.h>
34#include <lely/util/errnum.h>
35#include <lely/util/spscring.h>
36#include <lely/util/time.h>
37#include <lely/util/util.h>
38
39#include <assert.h>
40#include <stdlib.h>
41
42#ifndef LELY_IO_USER_CAN_RXLEN
47#define LELY_IO_USER_CAN_RXLEN 1024
48#endif
49
51 int is_err;
52 union {
53 struct can_msg msg;
54 struct can_err err;
55 } u;
56 struct timespec ts;
57};
58
59static io_ctx_t *io_user_can_chan_dev_get_ctx(const io_dev_t *dev);
60static ev_exec_t *io_user_can_chan_dev_get_exec(const io_dev_t *dev);
61static size_t io_user_can_chan_dev_cancel(io_dev_t *dev, struct ev_task *task);
62static size_t io_user_can_chan_dev_abort(io_dev_t *dev, struct ev_task *task);
63
64// clang-format off
65static const struct io_dev_vtbl io_user_can_chan_dev_vtbl = {
66 &io_user_can_chan_dev_get_ctx,
67 &io_user_can_chan_dev_get_exec,
68 &io_user_can_chan_dev_cancel,
69 &io_user_can_chan_dev_abort
70};
71// clang-format on
72
73static io_dev_t *io_user_can_chan_get_dev(const io_can_chan_t *chan);
74static int io_user_can_chan_get_flags(const io_can_chan_t *chan);
75static int io_user_can_chan_read(io_can_chan_t *chan, struct can_msg *msg,
76 struct can_err *err, struct timespec *tp, int timeout);
77static void io_user_can_chan_submit_read(
78 io_can_chan_t *chan, struct io_can_chan_read *read);
79static int io_user_can_chan_write(
80 io_can_chan_t *chan, const struct can_msg *msg, int timeout);
81static void io_user_can_chan_submit_write(
82 io_can_chan_t *chan, struct io_can_chan_write *write);
83
84// clang-format off
85static const struct io_can_chan_vtbl io_user_can_chan_vtbl = {
86 &io_user_can_chan_get_dev,
87 &io_user_can_chan_get_flags,
88 &io_user_can_chan_read,
89 &io_user_can_chan_submit_read,
90 &io_user_can_chan_write,
91 &io_user_can_chan_submit_write
92};
93// clang-format on
94
95static void io_user_can_chan_svc_shutdown(struct io_svc *svc);
96
97// clang-format off
98static const struct io_svc_vtbl io_user_can_chan_svc_vtbl = {
99 NULL,
100 &io_user_can_chan_svc_shutdown
101};
102// clang-format on
103
107 const struct io_dev_vtbl *dev_vptr;
111 struct io_svc svc;
117 int flags;
131 void *arg;
136#if !LELY_NO_THREADS
145#endif
150#if !LELY_NO_THREADS
156#endif
158 unsigned shutdown : 1;
160 unsigned read_posted : 1;
162 unsigned write_posted : 1;
171};
172
173static void io_user_can_chan_read_task_func(struct ev_task *task);
174static void io_user_can_chan_write_task_func(struct ev_task *task);
175
176static inline struct io_user_can_chan *io_user_can_chan_from_dev(
177 const io_dev_t *dev);
178static inline struct io_user_can_chan *io_user_can_chan_from_chan(
179 const io_can_chan_t *chan);
180static inline struct io_user_can_chan *io_user_can_chan_from_svc(
181 const struct io_svc *svc);
182
183static int io_user_can_chan_on_frame(struct io_user_can_chan *user,
184 const struct io_user_can_frame *frame, int timeout);
185
186static void io_user_can_chan_p_signal(struct spscring *ring, void *arg);
187static void io_user_can_chan_c_signal(struct spscring *ring, void *arg);
188
189static void io_user_can_chan_do_pop(struct io_user_can_chan *user,
190 struct sllist *read_queue, struct sllist *write_queue,
191 struct ev_task *task);
192
193static size_t io_user_can_chan_do_abort_tasks(struct io_user_can_chan *user);
194
195void *
196io_user_can_chan_alloc(void)
197{
198 struct io_user_can_chan *user = malloc(sizeof(*user));
199 if (!user) {
200#if !LELY_NO_ERRNO
201 set_errc(errno2c(errno));
202#endif
203 return NULL;
204 }
205 // Suppress a GCC maybe-uninitialized warning.
206 user->chan_vptr = NULL;
207 // cppcheck-suppress memleak symbolName=user
208 return &user->chan_vptr;
209}
210
211void
212io_user_can_chan_free(void *ptr)
213{
214 if (ptr)
215 free(io_user_can_chan_from_chan(ptr));
216}
217
219io_user_can_chan_init(io_can_chan_t *chan, io_ctx_t *ctx, ev_exec_t *exec,
220 int flags, size_t rxlen, int txtimeo,
222{
223 struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
224 assert(ctx);
225 assert(exec);
226
227 if (flags & ~IO_CAN_BUS_FLAG_MASK) {
229 return NULL;
230 }
231
232 if (!rxlen)
234
235 if (!txtimeo)
237
238 int errc = 0;
239
240 user->dev_vptr = &io_user_can_chan_dev_vtbl;
241 user->chan_vptr = &io_user_can_chan_vtbl;
242
243 user->svc = (struct io_svc)IO_SVC_INIT(&io_user_can_chan_svc_vtbl);
244 user->ctx = ctx;
245
246 user->exec = exec;
247
248 user->flags = flags;
249 user->txtimeo = txtimeo;
250
251 user->func = func;
252 user->arg = arg;
253
254 user->read_task = (struct ev_task)EV_TASK_INIT(
255 user->exec, &io_user_can_chan_read_task_func);
256 user->write_task = (struct ev_task)EV_TASK_INIT(
257 user->exec, &io_user_can_chan_write_task_func);
258
259#if !LELY_NO_THREADS
260 if (mtx_init(&user->p_mtx, mtx_plain) != thrd_success) {
261 errc = get_errc();
262 goto error_init_p_mtx;
263 }
264
265 if (cnd_init(&user->p_cond) != thrd_success) {
266 errc = get_errc();
267 goto error_init_p_cond;
268 }
269
270 if (mtx_init(&user->c_mtx, mtx_plain) != thrd_success) {
271 errc = get_errc();
272 goto error_init_c_mtx;
273 }
274
275 if (cnd_init(&user->c_cond) != thrd_success) {
276 errc = get_errc();
277 goto error_init_c_cond;
278 }
279#endif
280
281 spscring_init(&user->rxring, rxlen);
282 user->rxbuf = calloc(rxlen, sizeof(struct io_user_can_frame));
283 if (!user->rxbuf) {
284#if !LELY_NO_ERRNO
285 errc = errno2c(errno);
286#endif
287 goto error_alloc_rxbuf;
288 }
289
290#if !LELY_NO_THREADS
291 if (mtx_init(&user->mtx, mtx_plain) != thrd_success) {
292 errc = get_errc();
293 goto error_init_mtx;
294 }
295#endif
296
297 user->shutdown = 0;
298 user->read_posted = 0;
299 user->write_posted = 0;
300
301 sllist_init(&user->read_queue);
302 sllist_init(&user->write_queue);
303 user->current_read = NULL;
304 user->current_write = NULL;
305
306 io_ctx_insert(user->ctx, &user->svc);
307
308 return chan;
309
310#if !LELY_NO_THREADS
311 // mtx_destroy(&user->mtx);
312error_init_mtx:
313#endif
314 free(user->rxbuf);
315error_alloc_rxbuf:
316#if !LELY_NO_THREADS
317 cnd_destroy(&user->c_cond);
318error_init_c_cond:
319 mtx_destroy(&user->c_mtx);
320error_init_c_mtx:
321 cnd_destroy(&user->p_cond);
322error_init_p_cond:
323 mtx_destroy(&user->p_mtx);
324error_init_p_mtx:
325#endif
326 set_errc(errc);
327 return NULL;
328}
329
330void
331io_user_can_chan_fini(io_can_chan_t *chan)
332{
333 struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
334
335 io_ctx_remove(user->ctx, &user->svc);
336 // Cancel all pending tasks.
337 io_user_can_chan_svc_shutdown(&user->svc);
338
339 // Abort any consumer wait operation running in a task. Producer wait
340 // operations are only initiated by io_user_can_chan_on_msg() and
341 // io_user_can_chan_on_err(), and should have terminated before this
342 // function is called.
344
345#if !LELY_NO_THREADS
346 int warning = 0;
347 mtx_lock(&user->mtx);
348 // If necessary, busy-wait until io_user_can_chan_read_task_func() and
349 // io_user_can_chan_write_task_func() complete.
350 while (user->read_posted || user->write_posted) {
351 if (io_user_can_chan_do_abort_tasks(user))
352 continue;
353 mtx_unlock(&user->mtx);
354 if (!warning) {
355 warning = 1;
357 "io_user_can_chan_fini() invoked with pending operations");
358 }
359 thrd_yield();
360 mtx_lock(&user->mtx);
361 }
362 mtx_unlock(&user->mtx);
363
364 mtx_destroy(&user->mtx);
365#endif
366
367 free(user->rxbuf);
368
369#if !LELY_NO_THREADS
370 cnd_destroy(&user->c_cond);
371 mtx_destroy(&user->c_mtx);
372 cnd_destroy(&user->p_cond);
373 mtx_destroy(&user->p_mtx);
374#endif
375}
376
380{
381 int errc = 0;
382
383 io_can_chan_t *chan = io_user_can_chan_alloc();
384 if (!chan) {
385 errc = get_errc();
386 goto error_alloc;
387 }
388
389 io_can_chan_t *tmp = io_user_can_chan_init(
390 chan, ctx, exec, flags, rxlen, txtimeo, func, arg);
391 if (!tmp) {
392 errc = get_errc();
393 goto error_init;
394 }
395 chan = tmp;
396
397 return chan;
398
399error_init:
400 io_user_can_chan_free((void *)chan);
401error_alloc:
402 set_errc(errc);
403 return NULL;
404}
405
406void
408{
409 if (chan) {
410 io_user_can_chan_fini(chan);
411 io_user_can_chan_free((void *)chan);
412 }
413}
414
415int
417 const struct timespec *tp, int timeout)
418{
419 struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
420 assert(msg);
421
422#if !LELY_NO_CANFD
423 int flags = 0;
424 if (msg->flags & CAN_FLAG_FDF)
426 if (msg->flags & CAN_FLAG_BRS)
428 if ((flags & user->flags) != flags) {
430 return -1;
431 }
432#endif
433
434 struct io_user_can_frame frame = { .is_err = 0,
435 .u.msg = *msg,
436 .ts = tp ? *tp : (struct timespec){ 0, 0 } };
437 return io_user_can_chan_on_frame(user, &frame, timeout);
438}
439
440int
442 const struct timespec *tp, int timeout)
443{
444 struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
445 assert(err);
446
447 if (!(user->flags & IO_CAN_BUS_FLAG_ERR)) {
449 return -1;
450 }
451
452 struct io_user_can_frame frame = { .is_err = 1,
453 .u.err = *err,
454 .ts = tp ? *tp : (struct timespec){ 0, 0 } };
455 return io_user_can_chan_on_frame(user, &frame, timeout);
456}
457
458static io_ctx_t *
459io_user_can_chan_dev_get_ctx(const io_dev_t *dev)
460{
461 const struct io_user_can_chan *user = io_user_can_chan_from_dev(dev);
462
463 return user->ctx;
464}
465
466static ev_exec_t *
467io_user_can_chan_dev_get_exec(const io_dev_t *dev)
468{
469 const struct io_user_can_chan *user = io_user_can_chan_from_dev(dev);
470
471 return user->exec;
472}
473
474static size_t
475io_user_can_chan_dev_cancel(io_dev_t *dev, struct ev_task *task)
476{
477 struct io_user_can_chan *user = io_user_can_chan_from_dev(dev);
478
479 size_t n = 0;
480
481 struct sllist read_queue, write_queue;
482 sllist_init(&read_queue);
483 sllist_init(&write_queue);
484
485#if !LELY_NO_THREADS
486 mtx_lock(&user->mtx);
487#endif
488 if (user->current_read && (!task || task == user->current_read)) {
489 user->current_read = NULL;
490 n++;
491 }
492 if (user->current_write && (!task || task == user->current_write)) {
493 user->current_write = NULL;
494 n++;
495 }
496 io_user_can_chan_do_pop(user, &read_queue, &write_queue, task);
497#if !LELY_NO_THREADS
498 mtx_unlock(&user->mtx);
499#endif
500
501 size_t nread = io_can_chan_read_queue_post(
502 &read_queue, -1, errnum2c(ERRNUM_CANCELED));
503 n = n < SIZE_MAX - nread ? n + nread : SIZE_MAX;
504 size_t nwrite = io_can_chan_write_queue_post(
505 &write_queue, errnum2c(ERRNUM_CANCELED));
506 n = n < SIZE_MAX - nwrite ? n + nwrite : SIZE_MAX;
507
508 return n;
509}
510
511static size_t
512io_user_can_chan_dev_abort(io_dev_t *dev, struct ev_task *task)
513{
514 struct io_user_can_chan *user = io_user_can_chan_from_dev(dev);
515
516 struct sllist queue;
517 sllist_init(&queue);
518
519#if !LELY_NO_THREADS
520 mtx_lock(&user->mtx);
521#endif
522 io_user_can_chan_do_pop(user, &queue, &queue, task);
523#if !LELY_NO_THREADS
524 mtx_unlock(&user->mtx);
525#endif
526
527 return ev_task_queue_abort(&queue);
528}
529
530static io_dev_t *
531io_user_can_chan_get_dev(const io_can_chan_t *chan)
532{
533 const struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
534
535 return &user->dev_vptr;
536}
537
538static int
539io_user_can_chan_get_flags(const io_can_chan_t *chan)
540{
541 const struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
542
543 return user->flags;
544}
545
546static int
547io_user_can_chan_read(io_can_chan_t *chan, struct can_msg *msg,
548 struct can_err *err, struct timespec *tp, int timeout)
549{
550 struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
551
552#if !LELY_NO_THREADS
553 // Compute the absolute timeout for cnd_timedwait().
554 struct timespec ts = { 0, 0 };
555 if (timeout > 0) {
556#if LELY_NO_TIMEOUT
557 timeout = 0;
558 (void)ts;
559#else
560 if (!timespec_get(&ts, TIME_UTC))
561 return -1;
562 timespec_add_msec(&ts, timeout);
563#endif
564 }
565#endif
566
567#if !LELY_NO_THREADS
568 mtx_lock(&user->c_mtx);
569#endif
570 size_t i = 0;
571 for (;;) {
572 // Check if a frame is available in the receive queue.
573 size_t n = 1;
574 i = spscring_c_alloc(&user->rxring, &n);
575 if (n)
576 break;
577 if (!timeout) {
578#if !LELY_NO_THREADS
579 mtx_unlock(&user->c_mtx);
580#endif
582 return -1;
583 }
584 // Submit a wait operation for a single frame.
585 // clang-format off
586 if (!spscring_c_submit_wait(&user->rxring, 1,
587 &io_user_can_chan_c_signal, user))
588 // clang-format on
589 // If the wait condition was already satisfied, try
590 // again.
591 continue;
592 // Wait for the buffer to signal that a frame is
593 // available, or time out if that takes too long.
594#if !LELY_NO_THREADS
595 int result;
596#if !LELY_NO_TIMEOUT
597 if (timeout > 0)
598 result = cnd_timedwait(
599 &user->c_cond, &user->c_mtx, &ts);
600 else
601#endif
602 result = cnd_wait(&user->c_cond, &user->c_mtx);
603 if (result != thrd_success) {
604 mtx_unlock(&user->c_mtx);
605#if !LELY_NO_TIMEOUT
606 if (result == thrd_timedout)
608#endif
609 return -1;
610 }
611#endif
612 }
613 // Copy the frame from the buffer.
614 struct io_user_can_frame *frame = &user->rxbuf[i];
615 int is_err = frame->is_err;
616 if (!is_err && msg)
617 *msg = frame->u.msg;
618 else if (is_err && err)
619 *err = frame->u.err;
620 if (tp)
621 *tp = frame->ts;
622 spscring_c_commit(&user->rxring, 1);
623#if !LELY_NO_THREADS
624 mtx_unlock(&user->c_mtx);
625#endif
626
627 return !is_err;
628}
629
630static void
631io_user_can_chan_submit_read(io_can_chan_t *chan, struct io_can_chan_read *read)
632{
633 struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
634 assert(read);
635 struct ev_task *task = &read->task;
636
637 if (!task->exec)
638 task->exec = user->exec;
640
641#if !LELY_NO_THREADS
642 mtx_lock(&user->mtx);
643#endif
644 if (user->shutdown) {
645#if !LELY_NO_THREADS
646 mtx_unlock(&user->mtx);
647#endif
648 io_can_chan_read_post(read, -1, errnum2c(ERRNUM_CANCELED));
649 } else {
650 int post_read = !user->read_posted
651 && sllist_empty(&user->read_queue);
652 sllist_push_back(&user->read_queue, &task->_node);
653 if (post_read)
654 user->read_posted = 1;
655#if !LELY_NO_THREADS
656 mtx_unlock(&user->mtx);
657#endif
658 // cppcheck-suppress duplicateCondition
659 if (post_read)
660 ev_exec_post(user->read_task.exec, &user->read_task);
661 }
662}
663
664static int
665io_user_can_chan_write(
666 io_can_chan_t *chan, const struct can_msg *msg, int timeout)
667{
668 struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
669 assert(msg);
670
671#if !LELY_NO_CANFD
672 int flags = 0;
673 if (msg->flags & CAN_FLAG_FDF)
675 if (msg->flags & CAN_FLAG_BRS)
677 if ((flags & user->flags) != flags) {
679 return -1;
680 }
681#endif
682
683 if (!user->func) {
685 return -1;
686 }
687
688 return user->func(msg, timeout, user->arg);
689}
690
691static void
692io_user_can_chan_submit_write(
693 io_can_chan_t *chan, struct io_can_chan_write *write)
694{
695 struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
696 assert(write);
697 struct ev_task *task = &write->task;
698
699#if !LELY_NO_CANFD
700 int flags = 0;
701 if (write->msg->flags & CAN_FLAG_FDF)
702 flags |= IO_CAN_BUS_FLAG_FDF;
703 if (write->msg->flags & CAN_FLAG_BRS)
704 flags |= IO_CAN_BUS_FLAG_BRS;
705#endif
706
707 if (!task->exec)
708 task->exec = user->exec;
710
711#if !LELY_NO_THREADS
712 mtx_lock(&user->mtx);
713#endif
714 if (user->shutdown) {
715#if !LELY_NO_THREADS
716 mtx_unlock(&user->mtx);
717#endif
718 io_can_chan_write_post(write, errnum2c(ERRNUM_CANCELED));
719#if !LELY_NO_CANFD
720 } else if ((flags & user->flags) != flags) {
721#if !LELY_NO_THREADS
722 mtx_unlock(&user->mtx);
723#endif
724 io_can_chan_write_post(write, errnum2c(ERRNUM_INVAL));
725#endif
726 } else if (!user->func) {
727#if !LELY_NO_THREADS
728 mtx_unlock(&user->mtx);
729#endif
730 io_can_chan_write_post(write, errnum2c(ERRNUM_NOSYS));
731 } else {
732 int post_write = !user->write_posted
733 && sllist_empty(&user->write_queue);
734 sllist_push_back(&user->write_queue, &task->_node);
735 if (post_write)
736 user->write_posted = 1;
737#if !LELY_NO_THREADS
738 mtx_unlock(&user->mtx);
739#endif
740 // cppcheck-suppress duplicateCondition
741 if (post_write)
742 ev_exec_post(user->write_task.exec, &user->write_task);
743 }
744}
745
746static void
747io_user_can_chan_svc_shutdown(struct io_svc *svc)
748{
749 struct io_user_can_chan *user = io_user_can_chan_from_svc(svc);
750 io_dev_t *dev = &user->dev_vptr;
751
752#if !LELY_NO_THREADS
753 mtx_lock(&user->mtx);
754#endif
755 int shutdown = !user->shutdown;
756 user->shutdown = 1;
757 if (shutdown)
758 // Try to abort io_user_can_chan_read_task_func() and
759 // io_user_can_chan_write_task_func().
760 io_user_can_chan_do_abort_tasks(user);
761#if !LELY_NO_THREADS
762 mtx_unlock(&user->mtx);
763#endif
764 // cppcheck-suppress duplicateCondition
765 if (shutdown)
766 // Cancel all pending operations.
767 io_user_can_chan_dev_cancel(dev, NULL);
768}
769
770static void
771io_user_can_chan_read_task_func(struct ev_task *task)
772{
773 assert(task);
774 struct io_user_can_chan *user =
775 structof(task, struct io_user_can_chan, read_task);
776 io_can_chan_t *chan = &user->chan_vptr;
777
778 int errsv = get_errc();
779
780 int wouldblock = 0;
781
782#if !LELY_NO_THREADS
783 mtx_lock(&user->mtx);
784#endif
785 // Try to process all pending read operations at once.
786 while ((task = user->current_read = ev_task_from_node(
787 sllist_pop_front(&user->read_queue)))) {
788#if !LELY_NO_THREADS
789 mtx_unlock(&user->mtx);
790#endif
791 struct io_can_chan_read *read =
793 int result = io_user_can_chan_read(
794 chan, read->msg, read->err, read->tp, 0);
795 int errc = result >= 0 ? 0 : get_errc();
796 wouldblock = errc == errnum2c(ERRNUM_AGAIN)
797 || errc == errnum2c(ERRNUM_WOULDBLOCK);
798 if (!wouldblock)
799 // The operation succeeded or failed immediately.
800 io_can_chan_read_post(read, result, errc);
801#if !LELY_NO_THREADS
802 mtx_lock(&user->mtx);
803#endif
804 if (task == user->current_read) {
805 // Put the read operation back on the queue if it would
806 // block, unless it was canceled.
807 if (wouldblock) {
809 &task->_node);
810 task = NULL;
811 }
812 user->current_read = NULL;
813 }
814 assert(!user->current_read);
815 // Stop if the operation did or would block.
816 if (wouldblock)
817 break;
818 }
819 // Repost this task if any read operations remain in the queue.
820 int post_read = !sllist_empty(&user->read_queue) && !user->shutdown;
821 // Register a wait operation if the receive queue is empty.
822 if (post_read && wouldblock) {
823#if !LELY_NO_THREADS
824 mtx_lock(&user->c_mtx);
825#endif
826 // Do not repost this task unless the wait condition can be
827 // satisfied immediately.
828 post_read = !spscring_c_submit_wait(&user->rxring, 1,
829 io_user_can_chan_c_signal, user);
830#if !LELY_NO_THREADS
831 mtx_unlock(&user->c_mtx);
832#endif
833 }
834 user->read_posted = post_read;
835#if !LELY_NO_THREADS
836 mtx_unlock(&user->mtx);
837#endif
838
839 if (task && wouldblock)
840 // The operation would block but was canceled before it could be
841 // requeued.
842 io_can_chan_read_post(io_can_chan_read_from_task(task), -1,
844
845 if (post_read)
846 ev_exec_post(user->read_task.exec, &user->read_task);
847
848 set_errc(errsv);
849}
850
851static void
852io_user_can_chan_write_task_func(struct ev_task *task)
853{
854 assert(task);
855 struct io_user_can_chan *user =
856 structof(task, struct io_user_can_chan, write_task);
857 io_can_chan_t *chan = &user->chan_vptr;
858
859 int errsv = get_errc();
860
861 int wouldblock = 0;
862
863#if !LELY_NO_THREADS
864 mtx_lock(&user->mtx);
865#endif
866 // clang-format off
867 if ((task = user->current_write = ev_task_from_node(
868 sllist_pop_front(&user->write_queue)))) {
869 // clang-format on
870#if !LELY_NO_THREADS
871 mtx_unlock(&user->mtx);
872#endif
873 struct io_can_chan_write *write =
875 int result = io_user_can_chan_write(
876 chan, write->msg, user->txtimeo);
877 int errc = !result ? 0 : get_errc();
878 wouldblock = errc == errnum2c(ERRNUM_AGAIN)
880 if (!wouldblock)
881 // The operation succeeded or failed immediately.
882 io_can_chan_write_post(write, errc);
883#if !LELY_NO_THREADS
884 mtx_lock(&user->mtx);
885#endif
886 if (task == user->current_write) {
887 // Put the read operation back on the queue if it would
888 // block, unless it was canceled.
889 if (wouldblock) {
891 &task->_node);
892 task = NULL;
893 }
894 user->current_write = NULL;
895 }
896 assert(!user->current_write);
897 }
898 int post_write = user->write_posted =
899 !sllist_empty(&user->write_queue) && !user->shutdown;
900#if !LELY_NO_THREADS
901 mtx_unlock(&user->mtx);
902#endif
903
904 if (task && wouldblock)
905 // The operation would block but was canceled before it could be
906 // requeued.
907 io_can_chan_write_post(io_can_chan_write_from_task(task),
909
910 if (post_write)
911 ev_exec_post(user->write_task.exec, &user->write_task);
912
913 set_errc(errsv);
914}
915
916static inline struct io_user_can_chan *
917io_user_can_chan_from_dev(const io_dev_t *dev)
918{
919 assert(dev);
920
921 return structof(dev, struct io_user_can_chan, dev_vptr);
922}
923
924static inline struct io_user_can_chan *
925io_user_can_chan_from_chan(const io_can_chan_t *chan)
926{
927 assert(chan);
928
929 return structof(chan, struct io_user_can_chan, chan_vptr);
930}
931
932static inline struct io_user_can_chan *
933io_user_can_chan_from_svc(const struct io_svc *svc)
934{
935 assert(svc);
936
937 return structof(svc, struct io_user_can_chan, svc);
938}
939
940static int
941io_user_can_chan_on_frame(struct io_user_can_chan *user,
942 const struct io_user_can_frame *frame, int timeout)
943{
944 assert(user);
945 assert(frame);
946
947#if !LELY_NO_THREADS
948 // Compute the absolute timeout for cnd_timedwait().
949 struct timespec ts = { 0, 0 };
950 if (timeout > 0) {
951#if LELY_NO_TIMEOUT
952 timeout = 0;
953 (void)ts;
954#else
955 if (!timespec_get(&ts, TIME_UTC))
956 return -1;
957 timespec_add_msec(&ts, timeout);
958#endif
959 }
960#endif
961
962#if !LELY_NO_THREADS
963 mtx_lock(&user->p_mtx);
964#endif
965 size_t i = 0;
966 for (;;) {
967 // Check if a slot is available in the receive queue.
968 size_t n = 1;
969 i = spscring_p_alloc(&user->rxring, &n);
970 if (n)
971 break;
972 if (!timeout) {
973#if !LELY_NO_THREADS
974 mtx_unlock(&user->p_mtx);
975#endif
977 return -1;
978 }
979 // clang-format off
980 if (!spscring_p_submit_wait(&user->rxring, 1,
981 &io_user_can_chan_p_signal, user))
982 // clang-format on
983 // If the wait condition was already satisfied, try
984 // again.
985 continue;
986#if !LELY_NO_THREADS
987 int result;
988#if !LELY_NO_TIMEOUT
989 // Wait for the buffer to signal that a slot is available, or
990 // time out if that takes too long.
991 if (timeout > 0)
992 result = cnd_timedwait(
993 &user->p_cond, &user->p_mtx, &ts);
994 else
995#endif
996 result = cnd_wait(&user->p_cond, &user->p_mtx);
997 if (result != thrd_success) {
998 mtx_unlock(&user->p_mtx);
999#if !LELY_NO_TIMEOUT
1000 if (result == thrd_timedout)
1002#endif
1003 return -1;
1004 }
1005#endif
1006 }
1007 // Copy the frame to the buffer.
1008 user->rxbuf[i] = *frame;
1009 spscring_p_commit(&user->rxring, 1);
1010#if !LELY_NO_THREADS
1011 mtx_unlock(&user->p_mtx);
1012#endif
1013
1014 return 0;
1015}
1016
1017static void
1018io_user_can_chan_p_signal(struct spscring *ring, void *arg)
1019{
1020 (void)ring;
1021
1022#if LELY_NO_THREADS
1023 (void)arg;
1024#else
1025 struct io_user_can_chan *user = arg;
1026 assert(user);
1027
1028 mtx_lock(&user->p_mtx);
1029 cnd_broadcast(&user->p_cond);
1030 mtx_unlock(&user->p_mtx);
1031#endif
1032}
1033
1034static void
1035io_user_can_chan_c_signal(struct spscring *ring, void *arg)
1036{
1037 (void)ring;
1038 struct io_user_can_chan *user = arg;
1039 assert(user);
1040
1041#if !LELY_NO_THREADS
1042 mtx_lock(&user->c_mtx);
1043 cnd_broadcast(&user->c_cond);
1044 mtx_unlock(&user->c_mtx);
1045#endif
1046
1047#if !LELY_NO_THREADS
1048 mtx_lock(&user->mtx);
1049#endif
1050 int post_read = !user->read_posted && !sllist_empty(&user->read_queue)
1051 && !user->shutdown;
1052 if (post_read)
1053 user->read_posted = 1;
1054#if !LELY_NO_THREADS
1055 mtx_unlock(&user->mtx);
1056#endif
1057 // cppcheck-suppress duplicateCondition
1058 if (post_read)
1059 ev_exec_post(user->read_task.exec, &user->read_task);
1060}
1061
1062static void
1063io_user_can_chan_do_pop(struct io_user_can_chan *user,
1064 struct sllist *read_queue, struct sllist *write_queue,
1065 struct ev_task *task)
1066{
1067 assert(user);
1068 assert(read_queue);
1069 assert(write_queue);
1070
1071 if (!task) {
1074 } else if (sllist_remove(&user->read_queue, &task->_node)) {
1075 sllist_push_back(read_queue, &task->_node);
1076 } else if (sllist_remove(&user->write_queue, &task->_node)) {
1077 sllist_push_back(write_queue, &task->_node);
1078 }
1079}
1080
1081static size_t
1082io_user_can_chan_do_abort_tasks(struct io_user_can_chan *user)
1083{
1084 size_t n = 0;
1085
1086 // Try to abort io_user_can_chan_read_task_func().
1087 // clang-format off
1088 if (user->read_posted && ev_exec_abort(user->read_task.exec,
1089 &user->read_task)) {
1090 // clang-format on
1091 user->read_posted = 0;
1092 n++;
1093 }
1094
1095 // Try to abort io_user_can_chan_write_task_func().
1096 // clang-format off
1097 if (user->write_posted && ev_exec_abort(user->write_task.exec,
1098 &user->write_task)) {
1099 // clang-format on
1100 user->write_posted = 0;
1101 n++;
1102 }
1103
1104 return n;
1105}
1106
1107#endif // !LELY_NO_MALLOC
@ CAN_FLAG_FDF
The FD Format (FDF) flag, formerly known as Extended Data Length (EDL).
Definition: msg.h:54
@ CAN_FLAG_BRS
The Bit Rate Switch (BRS) flag (only available in CAN FD format frames).
Definition: msg.h:62
This header file is part of the I/O library; it contains the I/O context and service declarations.
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:126
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:141
This header file is part of the utilities library; it contains the diagnostic declarations.
@ DIAG_WARNING
A warning.
Definition: diag.h:55
void diag(enum diag_severity severity, int errc, const char *format,...)
Emits a diagnostic message.
Definition: diag.c:171
This header file is part of the utilities library; it contains the native and platform-independent er...
int errnum2c(errnum_t errnum)
Transforms a platform-independent error number to a native error code.
Definition: errnum.c:810
@ ERRNUM_NOSYS
Function not supported.
Definition: errnum.h:184
@ ERRNUM_WOULDBLOCK
Operation would block.
Definition: errnum.h:233
@ ERRNUM_INVAL
Invalid argument.
Definition: errnum.h:132
@ ERRNUM_AGAIN
Resource unavailable, try again.
Definition: errnum.h:89
@ ERRNUM_CANCELED
Operation canceled.
Definition: errnum.h:99
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition: errnum.c:932
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:944
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:46
void set_errnum(errnum_t errnum)
Sets the current (thread-specific) platform-independent error number to errnum.
Definition: errnum.h:424
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:136
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:124
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:106
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
const struct io_can_chan_vtbl *const io_can_chan_t
An abstract CAN channel.
Definition: can.h:59
@ IO_CAN_BUS_FLAG_BRS
Bit Rate Switch support is enabled.
Definition: can.h:44
@ IO_CAN_BUS_FLAG_FDF
FD Format (formerly Extended Data Length) support is enabled.
Definition: can.h:42
@ IO_CAN_BUS_FLAG_ERR
Reception of error frames is enabled.
Definition: can.h:39
#define LELY_IO_TX_TIMEOUT
The default timeout (in milliseconds) for I/O write operations.
Definition: io2.h:36
This header file is part of the I/O library; it contains the user-defined CAN channel declarations.
int io_user_can_chan_write_t(const struct can_msg *msg, int timeout, void *arg)
The type of function invoked by a user-defined CAN channel when a CAN frame needs to be written.
Definition: can.h:50
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
struct io_can_chan_read * io_can_chan_read_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel read operation from a pointer to its completion task.
Definition: can.c:93
struct io_can_chan_write * io_can_chan_write_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel write operation from a pointer to its completion task.
Definition: can.c:99
const struct io_dev_vtbl *const io_dev_t
An abstract I/O device.
Definition: dev.h:35
io_can_chan_t * io_user_can_chan_create(io_ctx_t *ctx, ev_exec_t *exec, int flags, size_t rxlen, int txtimeo, io_user_can_chan_write_t *func, void *arg)
Creates a new user-defined CAN channel.
Definition: can.c:378
#define LELY_IO_USER_CAN_RXLEN
The default receive queue length (in number of CAN frames) of the user-defined CAN channel.
Definition: can.c:47
void io_user_can_chan_destroy(io_can_chan_t *chan)
Destroys a user-defined CAN channel.
Definition: can.c:407
int io_user_can_chan_on_msg(io_can_chan_t *chan, const struct can_msg *msg, const struct timespec *tp, int timeout)
Processes an incoming CAN frame.
Definition: can.c:416
int io_user_can_chan_on_err(io_can_chan_t *chan, const struct can_err *err, const struct timespec *tp, int timeout)
Processes an incoming CAN error frame.
Definition: can.c:441
int timespec_get(struct timespec *ts, int base)
Sets the interval at ts to hold the current calendar time based on the specified time base.
Definition: time.c:32
#define TIME_UTC
An integer constant greater than 0 that designates the UTC time base.
Definition: time.h:207
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:194
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:257
void sllist_push_front(struct sllist *list, struct slnode *node)
Pushes a node to the front of a singly-linked list.
Definition: sllist.h:221
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:232
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:202
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:243
This header file is part of the utilities library; it contains the single-producer,...
size_t spscring_c_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:262
int spscring_c_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:327
size_t spscring_p_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a consumer and, if this satisfies a wait operation...
Definition: spscring.c:138
int spscring_p_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:163
size_t spscring_c_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a producer and, if this satisfies a wait operation...
Definition: spscring.c:302
int spscring_c_abort_wait(struct spscring *ring)
Aborts a wait operation previously registered with spscring_c_submit_wait().
Definition: spscring.c:368
void spscring_init(struct spscring *ring, size_t size)
Initializes a single-producer, single-consumer ring buffer with the specified size.
Definition: spscring.c:47
size_t spscring_p_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:98
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
A CAN error frame.
Definition: err.h:28
A CAN or CAN FD format frame.
Definition: msg.h:87
uint_least8_t flags
The flags (any combination of CAN_FLAG_IDE, CAN_FLAG_RTR, CAN_FLAG_FDF, CAN_FLAG_BRS and CAN_FLAG_ESI...
Definition: msg.h:94
An executable task.
Definition: task.h:41
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
A CAN channel read operation.
Definition: can.h:74
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition: can.h:98
struct timespec * tp
The address at which to store the system time at which the CAN frame or CAN error frame was received.
Definition: can.h:93
struct can_msg * msg
The address at which to store the CAN frame.
Definition: can.h:80
struct can_err * err
The address at which to store the CAN error frame.
Definition: can.h:86
A CAN channel write operation.
Definition: can.h:110
const struct can_msg * msg
A pointer to the CAN frame to be written.
Definition: can.h:116
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the write operation.
Definition: can.h:121
int errc
The error number, obtained as if by get_errc(), if an error occurred or the operation was canceled.
Definition: can.h:126
Definition: ctx.c:38
The virtual table of an I/O service.
Definition: ctx.h:67
An I/O service.
Definition: ctx.h:49
The implementation of a user-defined CAN channel.
Definition: can.c:105
struct ev_task write_task
The task responsible for initiating write operations.
Definition: can.c:135
void * arg
The user-specific value to be passed as the second argument to func.
Definition: can.c:131
ev_exec_t * exec
A pointer to the executor used to execute all I/O tasks.
Definition: can.c:115
unsigned write_posted
A flag indicating whether write_task has been posted to exec.
Definition: can.c:162
const struct io_can_chan_vtbl * chan_vptr
A pointer to the virtual table for the CAN channel interface.
Definition: can.c:109
int txtimeo
The timeout (in milliseconds) when writing a CAN frame asynchronously.
Definition: can.c:122
cnd_t p_cond
The condition variable used to wake up the receive queue producer.
Definition: can.c:140
struct io_svc svc
The I/O service representing the channel.
Definition: can.c:111
struct spscring rxring
The ring buffer used to control the receive queue.
Definition: can.c:147
const struct io_dev_vtbl * dev_vptr
A pointer to the virtual table for the I/O device interface.
Definition: can.c:107
struct ev_task * current_write
The write operation currently being executed.
Definition: can.c:170
io_user_can_chan_write_t * func
A pointer to the function to be invoked when a CAN frame needs to be written.
Definition: can.c:127
cnd_t c_cond
The condition variable used to wake up the receive queue consumer.
Definition: can.c:144
mtx_t p_mtx
The mutex protecting the receive queue producer.
Definition: can.c:138
io_ctx_t * ctx
A pointer to the I/O context with which the channel is registered.
Definition: can.c:113
struct ev_task * current_read
The read operation currently being executed.
Definition: can.c:168
unsigned shutdown
A flag indicating whether the I/O service has been shut down.
Definition: can.c:158
struct ev_task read_task
The task responsible for initiating read operations.
Definition: can.c:133
unsigned read_posted
A flag indicating whether read_task has been posted to exec.
Definition: can.c:160
mtx_t mtx
The mutex protecting the channel and the queues of pending operations.
Definition: can.c:155
struct sllist read_queue
The queue containing pending read operations.
Definition: can.c:164
mtx_t c_mtx
The mutex protecting the receive queue consumer.
Definition: can.c:142
struct sllist write_queue
The queue containing pending write operations.
Definition: can.c:166
int flags
The flags specifying which CAN bus features are enabled.
Definition: can.c:117
struct io_user_can_frame * rxbuf
The receive queue.
Definition: can.c:149
A singly-linked list.
Definition: sllist.h:52
A single-producer, single-consumer ring buffer.
Definition: spscring.h:63
size_t ev_task_queue_abort(struct sllist *queue)
Aborts the tasks in queue by invoking ev_exec_on_task_fini() for each of them.
Definition: task.c:55
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int cnd_init(cnd_t *cond)
Creates a condition variable.
int cnd_timedwait(cnd_t *cond, mtx_t *mtx, const struct timespec *ts)
Atomically unlocks the mutex at mtx and endeavors to block until the condition variable at cond is si...
int cnd_broadcast(cnd_t *cond)
Unblocks all of the threads that are blocked on the condition variable at cond at the time of the cal...
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
pthread_cond_t cnd_t
A complete object type that holds an identifier for a condition variable.
Definition: threads.h:78
void cnd_destroy(cnd_t *cond)
Releases all resources used by the condition variable at cond.
int cnd_wait(cnd_t *cond, mtx_t *mtx)
Atomically unlocks the mutex at mtx and endeavors to block until the condition variable at cond is si...
void thrd_yield(void)
Endeavors to permit other threads to run, even if the current thread would ordinarily continue to run...
@ thrd_timedout
Indicates that the time specified in the call was reached without acquiring the requested resource.
Definition: threads.h:128
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109
This header file is part of the utilities library; it contains the time function declarations.
void timespec_add_msec(struct timespec *tp, uint_least64_t msec)
Adds msec milliseconds to the time at tp.
Definition: time.h:141