Lely core libraries 2.3.4
can_chan.c
Go to the documentation of this file.
1
24#include "io.h"
25
26#if !LELY_NO_STDIO && defined(__linux__)
27
28#include "../can.h"
29#include <lely/io2/ctx.h>
30#include <lely/io2/linux/can.h>
31#include <lely/io2/posix/poll.h>
32#include <lely/util/diag.h>
33#include <lely/util/spscring.h>
34#include <lely/util/time.h>
35#include <lely/util/util.h>
36
37#include <assert.h>
38#include <errno.h>
39#include <stdlib.h>
40
41#if !LELY_NO_THREADS
42#include <pthread.h>
43#endif
44#include <unistd.h>
45
46#include <linux/can/raw.h>
47#include <linux/sockios.h>
48#include <sys/ioctl.h>
49
50#include "../posix/fd.h"
51#include "can_attr.h"
52#include "can_err.h"
53#include "can_msg.h"
54
55#ifndef LELY_IO_CAN_RXLEN
57#define LELY_IO_CAN_RXLEN 1024
58#endif
59
61#if LELY_NO_CANFD
62 struct can_frame frame;
63#else
64 struct canfd_frame frame;
65#endif
66 size_t nbytes;
67 struct timespec ts;
68};
69
70static int io_can_fd_set_default(int fd);
71#if LELY_NO_CANFD
72static int io_can_fd_read(int fd, struct can_frame *frame, size_t *pnbytes,
73 int *pflags, struct timespec *tp, int timeout);
74#else
75static int io_can_fd_read(int fd, struct canfd_frame *frame, size_t *pnbytes,
76 int *pflags, struct timespec *tp, int timeout);
77#endif
78#if LELY_NO_CANFD
79static int io_can_fd_write(int fd, const struct can_frame *frame, size_t nbytes,
80 int dontwait);
81#else
82static int io_can_fd_write(int fd, const struct canfd_frame *frame,
83 size_t nbytes, int timeout);
84#endif
85static int io_can_fd_write_msg(int fd, const struct can_msg *msg, int timeout);
86
87static io_ctx_t *io_can_chan_impl_dev_get_ctx(const io_dev_t *dev);
88static ev_exec_t *io_can_chan_impl_dev_get_exec(const io_dev_t *dev);
89static size_t io_can_chan_impl_dev_cancel(io_dev_t *dev, struct ev_task *task);
90static size_t io_can_chan_impl_dev_abort(io_dev_t *dev, struct ev_task *task);
91
92// clang-format off
93static const struct io_dev_vtbl io_can_chan_impl_dev_vtbl = {
94 &io_can_chan_impl_dev_get_ctx,
95 &io_can_chan_impl_dev_get_exec,
96 &io_can_chan_impl_dev_cancel,
97 &io_can_chan_impl_dev_abort
98};
99// clang-format on
100
101static io_dev_t *io_can_chan_impl_get_dev(const io_can_chan_t *chan);
102static int io_can_chan_impl_get_flags(const io_can_chan_t *chan);
103static int io_can_chan_impl_read(io_can_chan_t *chan, struct can_msg *msg,
104 struct can_err *err, struct timespec *tp, int timeout);
105static void io_can_chan_impl_submit_read(
106 io_can_chan_t *chan, struct io_can_chan_read *read);
107static int io_can_chan_impl_write(
108 io_can_chan_t *chan, const struct can_msg *msg, int timeout);
109static void io_can_chan_impl_submit_write(
110 io_can_chan_t *chan, struct io_can_chan_write *write);
111
112// clang-format off
113static const struct io_can_chan_vtbl io_can_chan_impl_vtbl = {
114 &io_can_chan_impl_get_dev,
115 &io_can_chan_impl_get_flags,
116 &io_can_chan_impl_read,
117 &io_can_chan_impl_submit_read,
118 &io_can_chan_impl_write,
119 &io_can_chan_impl_submit_write
120};
121// clang-format on
122
123static void io_can_chan_impl_svc_shutdown(struct io_svc *svc);
124
125// clang-format off
126static const struct io_svc_vtbl io_can_chan_impl_svc_vtbl = {
127 NULL,
128 &io_can_chan_impl_svc_shutdown
129};
130// clang-format on
131
135 const struct io_dev_vtbl *dev_vptr;
145 struct io_svc svc;
158#if !LELY_NO_THREADS
160 pthread_mutex_t c_mtx;
161#endif
166#if !LELY_NO_THREADS
171 pthread_mutex_t mtx;
172#endif
174 int fd;
176 int flags;
180 unsigned shutdown : 1;
182 unsigned rxbuf_posted : 1;
184 unsigned read_posted : 1;
186 unsigned write_posted : 1;
195};
196
197static void io_can_chan_impl_watch_func(
198 struct io_poll_watch *watch, int events);
199static void io_can_chan_impl_rxbuf_task_func(struct ev_task *task);
200static void io_can_chan_impl_read_task_func(struct ev_task *task);
201static void io_can_chan_impl_write_task_func(struct ev_task *task);
202
203static inline struct io_can_chan_impl *io_can_chan_impl_from_dev(
204 const io_dev_t *dev);
205static inline struct io_can_chan_impl *io_can_chan_impl_from_chan(
206 const io_can_chan_t *chan);
207static inline struct io_can_chan_impl *io_can_chan_impl_from_svc(
208 const struct io_svc *svc);
209
210static void io_can_chan_impl_c_signal(struct spscring *ring, void *arg);
211
212static void io_can_chan_impl_do_pop(struct io_can_chan_impl *impl,
213 struct sllist *read_queue, struct sllist *write_queue,
214 struct sllist *confirm_queue, struct ev_task *task);
215
216static void io_can_chan_impl_do_read(struct io_can_chan_impl *impl,
217 struct sllist *queue, int *pwouldblock);
218static void io_can_chan_impl_do_confirm(struct io_can_chan_impl *impl,
219 struct sllist *queue, const struct can_msg *msg);
220
221static size_t io_can_chan_impl_do_abort_tasks(struct io_can_chan_impl *impl);
222
223static int io_can_chan_impl_set_fd(
224 struct io_can_chan_impl *impl, int fd, int flags);
225
226void *
227io_can_chan_alloc(void)
228{
229 struct io_can_chan_impl *impl = malloc(sizeof(*impl));
230 if (!impl)
231 return NULL;
232 // Suppress a GCC maybe-uninitialized warning.
233 impl->chan_vptr = NULL;
234 // cppcheck-suppress memleak symbolName=impl
235 return &impl->chan_vptr;
236}
237
238void
239io_can_chan_free(void *ptr)
240{
241 if (ptr)
242 free(io_can_chan_impl_from_chan(ptr));
243}
244
246io_can_chan_init(io_can_chan_t *chan, io_poll_t *poll, ev_exec_t *exec,
247 size_t rxlen)
248{
249 struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
250 io_ctx_t *ctx = poll ? io_poll_get_ctx(poll) : NULL;
251
252 if (!rxlen)
253 rxlen = LELY_IO_CAN_RXLEN;
254
255 int errsv = 0;
256
257 impl->dev_vptr = &io_can_chan_impl_dev_vtbl;
258 impl->chan_vptr = &io_can_chan_impl_vtbl;
259
260 impl->poll = poll;
261
262 impl->svc = (struct io_svc)IO_SVC_INIT(&io_can_chan_impl_svc_vtbl);
263 impl->ctx = ctx;
264
265 impl->exec = exec;
266
267 impl->watch = (struct io_poll_watch)IO_POLL_WATCH_INIT(
268 &io_can_chan_impl_watch_func);
269
270 impl->rxbuf_task = (struct ev_task)EV_TASK_INIT(
271 impl->exec, &io_can_chan_impl_rxbuf_task_func);
272 impl->read_task = (struct ev_task)EV_TASK_INIT(
273 impl->exec, &io_can_chan_impl_read_task_func);
274 impl->write_task = (struct ev_task)EV_TASK_INIT(
275 impl->exec, &io_can_chan_impl_write_task_func);
276
277#if !LELY_NO_THREADS
278 if ((errsv = pthread_mutex_init(&impl->c_mtx, NULL)))
279 goto error_init_c_mtx;
280#endif
281
282 spscring_init(&impl->rxring, rxlen);
283 impl->rxbuf = calloc(rxlen, sizeof(struct io_can_frame));
284 if (!impl->rxbuf) {
285 errsv = errno;
286 goto error_alloc_rxbuf;
287 }
288
289#if !LELY_NO_THREADS
290 if ((errsv = pthread_mutex_init(&impl->mtx, NULL)))
291 goto error_init_mtx;
292#endif
293
294 impl->fd = -1;
295 impl->flags = 0;
296 impl->events = 0;
297
298 impl->shutdown = 0;
299 impl->rxbuf_posted = 0;
300 impl->read_posted = 0;
301 impl->write_posted = 0;
302
303 sllist_init(&impl->read_queue);
304 sllist_init(&impl->write_queue);
306 impl->current_write = NULL;
307
308 if (impl->ctx)
309 io_ctx_insert(impl->ctx, &impl->svc);
310
311 return chan;
312
313#if !LELY_NO_THREADS
314 // pthread_mutex_destroy(&impl->mtx);
315error_init_mtx:
316#endif
317 free(impl->rxbuf);
318error_alloc_rxbuf:
319#if !LELY_NO_THREADS
320 pthread_mutex_destroy(&impl->c_mtx);
321error_init_c_mtx:
322#endif
323 errno = errsv;
324 return NULL;
325}
326
327void
328io_can_chan_fini(io_can_chan_t *chan)
329{
330 struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
331
332 if (impl->ctx)
333 io_ctx_remove(impl->ctx, &impl->svc);
334 // Cancel all pending operations.
335 io_can_chan_impl_svc_shutdown(&impl->svc);
336
337#if !LELY_NO_THREADS
338 // Abort any consumer wait operation running in a task.
339 pthread_mutex_lock(&impl->c_mtx);
341 pthread_mutex_unlock(&impl->c_mtx);
342
343 int warning = 0;
344 pthread_mutex_lock(&impl->mtx);
345 // If necessary, busy-wait until io_can_chan_impl_rxbuf_task_func(),
346 // io_can_chan_impl_read_task_func() and
347 // io_can_chan_impl_write_task_func() complete.
348 while (impl->rxbuf_posted || impl->read_posted || impl->write_posted) {
349 if (io_can_chan_impl_do_abort_tasks(impl))
350 continue;
351 pthread_mutex_unlock(&impl->mtx);
352 if (!warning) {
353 warning = 1;
355 "io_can_chan_fini() invoked with pending operations");
356 }
357 sched_yield();
358 pthread_mutex_lock(&impl->mtx);
359 }
360 pthread_mutex_unlock(&impl->mtx);
361#endif
362
363 // Close the socket.
364 if (impl->fd != -1) {
365 if (impl->events)
366 io_poll_watch(impl->poll, impl->fd, 0, &impl->watch);
367 close(impl->fd);
368 }
369
370#if !LELY_NO_THREADS
371 pthread_mutex_destroy(&impl->mtx);
372#endif
373
374 free(impl->rxbuf);
375
376#if !LELY_NO_THREADS
377 pthread_mutex_destroy(&impl->c_mtx);
378#endif
379}
380
383{
384 int errsv = 0;
385
386 io_can_chan_t *chan = io_can_chan_alloc();
387 if (!chan) {
388 errsv = errno;
389 goto error_alloc;
390 }
391
392 io_can_chan_t *tmp = io_can_chan_init(chan, poll, exec, rxlen);
393 if (!tmp) {
394 errsv = errno;
395 goto error_init;
396 }
397 chan = tmp;
398
399 return chan;
400
401error_init:
402 io_can_chan_free((void *)chan);
403error_alloc:
404 errno = errsv;
405 return NULL;
406}
407
408void
410{
411 if (chan) {
412 io_can_chan_fini(chan);
413 io_can_chan_free((void *)chan);
414 }
415}
416
417int
419{
420 const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
421
422#if !LELY_NO_THREADS
423 pthread_mutex_lock((pthread_mutex_t *)&impl->mtx);
424#endif
425 int fd = impl->fd;
426#if !LELY_NO_THREADS
427 pthread_mutex_unlock((pthread_mutex_t *)&impl->mtx);
428#endif
429 return fd;
430}
431
432int
434{
435 struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
436
438 errno = EINVAL;
439 return -1;
440 }
441
442 int errsv = 0;
443
444 int fd = socket(AF_CAN, SOCK_RAW | SOCK_CLOEXEC, CAN_RAW);
445 if (fd == -1) {
446 errsv = errno;
447 goto error_socket;
448 }
449
450 struct sockaddr_can addr = { .can_family = AF_CAN,
451 .can_ifindex = io_can_ctrl_get_index(ctrl) };
452
453 if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
454 errsv = errno;
455 goto error_bind;
456 }
457
458 if (flags & IO_CAN_BUS_FLAG_ERR) {
459 can_err_mask_t optval = CAN_ERR_MASK;
460 // clang-format off
461 if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_ERR_FILTER, &optval,
462 sizeof(optval)) == -1) {
463 // clang-format on
464 errsv = errno;
465 goto error_setsockopt;
466 }
467 }
468
469#if !LELY_NO_CANFD
470 if (flags & IO_CAN_BUS_FLAG_FDF) {
471 int optval = 1;
472 // clang-format off
473 if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_FD_FRAMES, &optval,
474 sizeof(optval)) == -1) {
475 // clang-format on
476 errsv = errno;
477 goto error_setsockopt;
478 }
479 }
480#endif
481
482 if (io_can_fd_set_default(fd) == -1) {
483 errsv = errno;
484 goto error_set_default;
485 }
486
487 fd = io_can_chan_impl_set_fd(impl, fd, flags);
488 if (fd != -1)
489 close(fd);
490
491 return 0;
492
493error_set_default:
494error_setsockopt:
495error_bind:
496 close(fd);
497error_socket:
498 errno = errsv;
499 return -1;
500}
501
502int
504{
505 struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
506
507 struct sockaddr_can addr = { .can_family = AF_UNSPEC };
508 socklen_t addrlen = sizeof(addr);
509 if (getsockname(fd, (struct sockaddr *)&addr, &addrlen) == -1)
510 return -1;
511 if (addrlen < sizeof(addr) || addr.can_family != AF_CAN) {
512 errno = ENODEV;
513 return -1;
514 }
515 unsigned int ifindex = addr.can_ifindex;
516
517 struct io_can_attr attr = IO_CAN_ATTR_INIT;
518 if (io_can_attr_get(&attr, ifindex) == -1)
519 return -1;
520 int flags = attr.flags;
521
522 {
523 can_err_mask_t optval = 0;
524 socklen_t optlen = sizeof(optval);
525 // clang-format off
526 if (getsockopt(fd, SOL_CAN_RAW, CAN_RAW_ERR_FILTER, &optval,
527 &optlen) == -1)
528 // clang-format on
529 return -1;
530 if (optval & CAN_ERR_MASK)
531 flags |= IO_CAN_BUS_FLAG_ERR;
532 }
533
534#if !LELY_NO_CANFD
535 // Check if CAN FD frames are allowed. If the check fails, we assume
536 // they are not.
537 {
538 int errsv = errno;
539 int optval = 0;
540 socklen_t optlen = sizeof(optval);
541 // clang-format off
542 if (!getsockopt(fd, SOL_CAN_RAW, CAN_RAW_FD_FRAMES, &optval,
543 &optlen) && !optval)
544 // clang-format on
546 errno = errsv;
547 }
548#endif
549
550 if (io_can_fd_set_default(fd) == -1)
551 return -1;
552
553 fd = io_can_chan_impl_set_fd(impl, fd, flags);
554 if (fd != -1)
555 close(fd);
556
557 return 0;
558}
559
560int
562{
563 struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
564
565 return io_can_chan_impl_set_fd(impl, -1, 0);
566}
567
568int
570{
571 return io_can_chan_get_handle(chan) != -1;
572}
573
574int
576{
577 int fd = io_can_chan_release(chan);
578 return fd != -1 ? close(fd) : 0;
579}
580
581static int
582io_can_fd_set_default(int fd)
583{
584 int optval;
585
586 // Enable local loopback.
587 optval = 1;
588 // clang-format off
589 if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_LOOPBACK, &optval,
590 sizeof(optval)) == -1)
591 // clang-format on
592 return -1;
593
594 // Enable the reception of CAN frames sent by this socket so we can
595 // check for successful transmission.
596 optval = 1;
597 // clang-format off
598 if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_RECV_OWN_MSGS, &optval,
599 sizeof(optval)) == -1)
600 // clang-format on
601 return -1;
602
603 // Set the size of the send buffer to its minimum value. This causes
604 // write operations to block (or return EAGAIN) instead of returning
605 // ENOBUFS.
606 optval = 0;
607 // clang-format off
608 if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval))
609 == -1)
610 // clang-format on
611 return -1;
612
613 return 0;
614}
615
616static int
617#if LELY_NO_CANFD
618io_can_fd_read(int fd, struct can_frame *frame, size_t *pnbytes, int *pflags,
619 struct timespec *tp, int timeout)
620#else
621io_can_fd_read(int fd, struct canfd_frame *frame, size_t *pnbytes, int *pflags,
622 struct timespec *tp, int timeout)
623#endif
624{
625 struct iovec iov = { .iov_base = (void *)frame,
626 .iov_len = sizeof(*frame) };
627 struct msghdr msg = { .msg_iov = &iov, .msg_iovlen = 1 };
628
629 ssize_t result;
630 for (;;) {
631 result = io_fd_recvmsg(fd, &msg, 0, timeout);
632 if (result < 0)
633 return result;
634#if LELY_NO_CANFD
635 if (result == CAN_MTU)
636#else
637 if (result == CAN_MTU || result == CANFD_MTU)
638#endif
639 break;
640 if (timeout > 0)
641 timeout = 0;
642 }
643
644 if (pnbytes)
645 *pnbytes = result;
646
647 if (pflags)
648 *pflags = msg.msg_flags;
649
650 if (tp) {
651 if (msg.msg_flags & MSG_CONFIRM) {
652 // Ignore the timestamp for write confirmations.
653 *tp = (struct timespec){ 0, 0 };
654 } else {
655 struct timeval tv = { 0, 0 };
656 if (ioctl(fd, SIOCGSTAMP, &tv) == -1)
657 return -1;
658 tp->tv_sec = tv.tv_sec;
659 tp->tv_nsec = tv.tv_usec * 1000;
660 }
661 }
662
663 return 0;
664}
665
666static int
667#if LELY_NO_CANFD
668io_can_fd_write(int fd, const struct can_frame *frame, size_t nbytes,
669 int timeout)
670#else
671io_can_fd_write(int fd, const struct canfd_frame *frame, size_t nbytes,
672 int timeout)
673#endif
674{
675 struct iovec iov = { .iov_base = (void *)frame, .iov_len = nbytes };
676 struct msghdr msg = { .msg_iov = &iov, .msg_iovlen = 1 };
677
678 return io_fd_sendmsg(fd, &msg, 0, timeout) > 0 ? 0 : -1;
679}
680
681static int
682io_can_fd_write_msg(int fd, const struct can_msg *msg, int timeout)
683{
684 assert(msg);
685
686 // Convert the frame to the SocketCAN format.
687 struct io_can_frame frame;
688#if !LELY_NO_CANFD
689 if (msg->flags & CAN_FLAG_FDF) {
690 if (can_msg2canfd_frame(msg, &frame.frame) == -1) {
691 errno = EINVAL;
692 return -1;
693 }
694 frame.nbytes = CANFD_MTU;
695 } else {
696#endif
697 if (can_msg2can_frame(msg, (struct can_frame *)&frame.frame)
698 == -1) {
699 errno = EINVAL;
700 return -1;
701 }
702 frame.nbytes = CAN_MTU;
703#if !LELY_NO_CANFD
704 }
705#endif
706
707 return io_can_fd_write(fd, &frame.frame, frame.nbytes, timeout);
708}
709
710static io_ctx_t *
711io_can_chan_impl_dev_get_ctx(const io_dev_t *dev)
712{
713 const struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
714
715 return impl->ctx;
716}
717
718static ev_exec_t *
719io_can_chan_impl_dev_get_exec(const io_dev_t *dev)
720{
721 const struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
722
723 return impl->exec;
724}
725
726static size_t
727io_can_chan_impl_dev_cancel(io_dev_t *dev, struct ev_task *task)
728{
729 struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
730
731 size_t n = 0;
732
733 struct sllist read_queue, write_queue, confirm_queue;
734 sllist_init(&read_queue);
735 sllist_init(&write_queue);
736 sllist_init(&confirm_queue);
737
738#if !LELY_NO_THREADS
739 pthread_mutex_lock(&impl->mtx);
740#endif
741 io_can_chan_impl_do_pop(
742 impl, &read_queue, &write_queue, &confirm_queue, task);
743 // Mark the ongoing write operation as canceled, if necessary.
744 if (impl->current_write && (!task || task == impl->current_write)) {
745 impl->current_write = NULL;
746 n++;
747 }
748#if !LELY_NO_THREADS
749 pthread_mutex_unlock(&impl->mtx);
750#endif
751
752 size_t nread = io_can_chan_read_queue_post(&read_queue, -1, ECANCELED);
753 n = n < SIZE_MAX - nread ? n + nread : SIZE_MAX;
754 size_t nwrite = io_can_chan_write_queue_post(&write_queue, ECANCELED);
755 n = n < SIZE_MAX - nwrite ? n + nwrite : SIZE_MAX;
756 size_t nconfirm =
757 io_can_chan_write_queue_post(&confirm_queue, ECANCELED);
758 n = n < SIZE_MAX - nconfirm ? n + nconfirm : SIZE_MAX;
759
760 return n;
761}
762
763static size_t
764io_can_chan_impl_dev_abort(io_dev_t *dev, struct ev_task *task)
765{
766 struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
767
768 struct sllist queue;
769 sllist_init(&queue);
770
771#if !LELY_NO_THREADS
772 pthread_mutex_lock(&impl->mtx);
773#endif
774 io_can_chan_impl_do_pop(impl, &queue, &queue, &queue, task);
775#if !LELY_NO_THREADS
776 pthread_mutex_unlock(&impl->mtx);
777#endif
778
779 return ev_task_queue_abort(&queue);
780}
781static io_dev_t *
782io_can_chan_impl_get_dev(const io_can_chan_t *chan)
783{
784 const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
785
786 return &impl->dev_vptr;
787}
788
789static int
790io_can_chan_impl_get_flags(const io_can_chan_t *chan)
791{
792 const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
793
794#if !LELY_NO_THREADS
795 pthread_mutex_lock((pthread_mutex_t *)&impl->mtx);
796#endif
797 int flags = impl->flags;
798#if !LELY_NO_THREADS
799 pthread_mutex_unlock((pthread_mutex_t *)&impl->mtx);
800#endif
801 return flags;
802}
803
804static int
805io_can_chan_impl_read(io_can_chan_t *chan, struct can_msg *msg,
806 struct can_err *err, struct timespec *tp, int timeout)
807{
808 struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
809
810 struct io_can_frame frame_;
811 struct io_can_frame *frame = &frame_;
812#if !LELY_NO_THREADS
813 pthread_mutex_lock(&impl->c_mtx);
814#endif
815 size_t n = 0;
816 for (;;) {
817 // Check if a frame is available in the receive queue.
818 n = 1;
819 size_t i = spscring_c_alloc(&impl->rxring, &n);
820 if (n) {
821 frame = &impl->rxbuf[i];
822 break;
823 }
824#if !LELY_NO_THREADS
825 pthread_mutex_unlock(&impl->c_mtx);
826 pthread_mutex_lock(&impl->mtx);
827#endif
828 // If not, read a frame directly.
829 int fd = impl->fd;
830#if !LELY_NO_THREADS
831 pthread_mutex_unlock(&impl->mtx);
832#endif
833 int flags = 0;
834 // clang-format off
835 if (io_can_fd_read(fd, &frame->frame, &frame->nbytes, &flags,
836 &frame->ts, timeout) < 0)
837 // clang-format on
838 return -1;
839 // Process the frame unless it is a write confirmation.
840 if (!(flags & MSG_CONFIRM))
841 break;
842 // Convert the frame from the SocketCAN format.
843 void *src = &frame->frame;
844 struct can_msg msg;
845#if !LELY_NO_CANFD
846 if (frame->nbytes == CANFD_MTU)
847 canfd_frame2can_msg(src, &msg);
848 else
849#endif
850 can_frame2can_msg(src, &msg);
851 // Process the write confirmation.
852 struct sllist queue;
853 sllist_init(&queue);
854#if !LELY_NO_THREADS
855 pthread_mutex_lock(&impl->mtx);
856#endif
857 io_can_chan_impl_do_confirm(impl, &queue, &msg);
858#if !LELY_NO_THREADS
859 pthread_mutex_unlock(&impl->mtx);
860#endif
861 ev_task_queue_post(&queue);
862 // Since the timeout is relative, we can only use a positive
863 // value once.
864 if (timeout > 0)
865 timeout = 0;
866#if !LELY_NO_THREADS
867 pthread_mutex_lock(&impl->c_mtx);
868#endif
869 }
870 // Parse the frame.
871 void *data = &frame->frame;
872 int is_err = can_frame2can_err(data, err);
873 if (!is_err && msg) {
874#if !LELY_NO_CANFD
875 if (frame->nbytes == CANFD_MTU)
876 canfd_frame2can_msg(data, msg);
877 else
878#endif
879 can_frame2can_msg(data, msg);
880 }
881 if (tp)
882 *tp = frame->ts;
883 // Remove the frame from the receive queue, if necessary.
884 if (n) {
885 spscring_c_commit(&impl->rxring, n);
886#if !LELY_NO_THREADS
887 pthread_mutex_unlock(&impl->c_mtx);
888#endif
889 }
890
891 return is_err == -1 ? -1 : !is_err;
892}
893
894static void
895io_can_chan_impl_submit_read(io_can_chan_t *chan, struct io_can_chan_read *read)
896{
897 struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
898 assert(read);
899 struct ev_task *task = &read->task;
900
901 if (!task->exec)
902 task->exec = impl->exec;
903 assert(task->exec);
905
906#if !LELY_NO_THREADS
907 pthread_mutex_lock(&impl->mtx);
908#endif
909 if (impl->shutdown) {
910#if !LELY_NO_THREADS
911 pthread_mutex_unlock(&impl->mtx);
912#endif
913 io_can_chan_read_post(read, -1, ECANCELED);
914 } else {
915 int post_read = !impl->read_posted
916 && sllist_empty(&impl->read_queue);
917 sllist_push_back(&impl->read_queue, &task->_node);
918 if (post_read)
919 impl->read_posted = 1;
920#if !LELY_NO_THREADS
921 pthread_mutex_unlock(&impl->mtx);
922#endif
923 assert(impl->read_task.exec);
924 if (post_read)
925 ev_exec_post(impl->read_task.exec, &impl->read_task);
926 }
927}
928
929static int
930io_can_chan_impl_write(
931 io_can_chan_t *chan, const struct can_msg *msg, int timeout)
932{
933 struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
934
935#if !LELY_NO_CANFD
936 int flags = 0;
937 if (msg->flags & CAN_FLAG_FDF)
939 if (msg->flags & CAN_FLAG_BRS)
941#endif
942
943#if !LELY_NO_THREADS
944 pthread_mutex_lock(&impl->mtx);
945#endif
946#if !LELY_NO_CANFD
947 if ((flags & impl->flags) != flags) {
948#if !LELY_NO_THREADS
949 pthread_mutex_unlock(&impl->mtx);
950#endif
951 errno = EINVAL;
952 return -1;
953 }
954#endif
955 int fd = impl->fd;
956#if !LELY_NO_THREADS
957 pthread_mutex_unlock(&impl->mtx);
958#endif
959
960 return io_can_fd_write_msg(fd, msg, timeout);
961}
962
963static void
964io_can_chan_impl_submit_write(
965 io_can_chan_t *chan, struct io_can_chan_write *write)
966{
967 struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
968 assert(write);
969 assert(write->msg);
970 struct ev_task *task = &write->task;
971
972#if !LELY_NO_CANFD
973 int flags = 0;
974 if (write->msg->flags & CAN_FLAG_FDF)
975 flags |= IO_CAN_BUS_FLAG_FDF;
976 if (write->msg->flags & CAN_FLAG_BRS)
977 flags |= IO_CAN_BUS_FLAG_BRS;
978#endif
979
980 if (!task->exec)
981 task->exec = impl->exec;
982 assert(task->exec);
984
985#if !LELY_NO_THREADS
986 pthread_mutex_lock(&impl->mtx);
987#endif
988 if (impl->shutdown) {
989#if !LELY_NO_THREADS
990 pthread_mutex_unlock(&impl->mtx);
991#endif
992 io_can_chan_write_post(write, ECANCELED);
993#if !LELY_NO_CANFD
994 } else if ((flags & impl->flags) != flags) {
995#if !LELY_NO_THREADS
996 pthread_mutex_unlock(&impl->mtx);
997#endif
998 io_can_chan_write_post(write, EINVAL);
999#endif
1000 } else {
1001 int post_write = !impl->write_posted
1002 && sllist_empty(&impl->write_queue);
1003 sllist_push_back(&impl->write_queue, &task->_node);
1004 if (post_write)
1005 impl->write_posted = 1;
1006#if !LELY_NO_THREADS
1007 pthread_mutex_unlock(&impl->mtx);
1008#endif
1009 assert(impl->write_task.exec);
1010 if (post_write)
1011 ev_exec_post(impl->write_task.exec, &impl->write_task);
1012 }
1013}
1014
1015static void
1016io_can_chan_impl_svc_shutdown(struct io_svc *svc)
1017{
1018 struct io_can_chan_impl *impl = io_can_chan_impl_from_svc(svc);
1019 io_dev_t *dev = &impl->dev_vptr;
1020
1021#if !LELY_NO_THREADS
1022 pthread_mutex_lock(&impl->mtx);
1023#endif
1024 int shutdown = !impl->shutdown;
1025 impl->shutdown = 1;
1026 if (shutdown) {
1027 if (impl->events) {
1028 impl->events = 0;
1029 // Stop monitoring I/O events.
1030 io_poll_watch(impl->poll, impl->fd, impl->events,
1031 &impl->watch);
1032 }
1033 // Try to abort io_can_chan_impl_rxbuf_task_func(),
1034 // io_can_chan_impl_read_task_func() and
1035 // io_can_chan_impl_write_task_func().
1036 io_can_chan_impl_do_abort_tasks(impl);
1037 }
1038#if !LELY_NO_THREADS
1039 pthread_mutex_unlock(&impl->mtx);
1040#endif
1041 // cppcheck-suppress duplicateCondition
1042 if (shutdown)
1043 // Cancel all pending operations.
1044 io_can_chan_impl_dev_cancel(dev, NULL);
1045}
1046
1047static void
1048io_can_chan_impl_watch_func(struct io_poll_watch *watch, int events)
1049{
1050 assert(watch);
1051 struct io_can_chan_impl *impl =
1053
1054#if !LELY_NO_THREADS
1055 pthread_mutex_lock(&impl->mtx);
1056#endif
1057 // Continue monitoring events that are not otherwise handled.
1058 if ((events & IO_EVENT_ERR) || impl->fd == -1 || impl->shutdown) {
1059 impl->events = 0;
1060 } else if ((impl->events &= ~events) != 0) {
1061 int errsv = errno;
1062 // clang-format off
1063 if (io_poll_watch(impl->poll, impl->fd, impl->events,
1064 &impl->watch) == -1) {
1065 // clang-format on
1066 impl->events = 0;
1068 }
1069 errno = errsv;
1070 }
1071
1072 // Process incoming CAN frames.
1073 int post_rxbuf = 0;
1074 if ((events & (IO_EVENT_IN | IO_EVENT_ERR)) && !impl->shutdown) {
1075 post_rxbuf = !impl->rxbuf_posted;
1076 impl->rxbuf_posted = 1;
1077 }
1078
1079 // Retry any pending write operations.
1080 int post_write = 0;
1081 if ((events & (IO_EVENT_OUT | IO_EVENT_ERR))
1082 && !sllist_empty(&impl->write_queue)
1083 && !impl->shutdown) {
1084 post_write = !impl->write_posted;
1085 impl->write_posted = 1;
1086 }
1087#if !LELY_NO_THREADS
1088 pthread_mutex_unlock(&impl->mtx);
1089#endif
1090
1091 if (post_rxbuf)
1092 ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1093 if (post_write)
1094 ev_exec_post(impl->write_task.exec, &impl->write_task);
1095}
1096
1097static void
1098io_can_chan_impl_rxbuf_task_func(struct ev_task *task)
1099{
1100 assert(task);
1101 struct io_can_chan_impl *impl =
1102 structof(task, struct io_can_chan_impl, rxbuf_task);
1103
1104 int errsv = errno;
1105
1106 struct sllist queue;
1107 sllist_init(&queue);
1108
1109 int result = 0;
1110 int errc = 0;
1111 int wouldblock = 0;
1112
1113#if !LELY_NO_THREADS
1114 pthread_mutex_lock(&impl->mtx);
1115#endif
1116 // Only try to fill the receive queue if there are pending read
1117 // operations and there is an empty slot in the queue, or if there are
1118 // pending write confirmations.
1119 // clang-format off
1120 while ((!sllist_empty(&impl->read_queue)
1121 && spscring_p_capacity(&impl->rxring))
1122 || !sllist_empty(&impl->confirm_queue)) {
1123 // clang-format on
1124 int fd = impl->fd;
1125#if !LELY_NO_THREADS
1126 pthread_mutex_unlock(&impl->mtx);
1127#endif
1128
1129 struct io_can_frame frame_;
1130 struct io_can_frame *frame = &frame_;
1131 // Try to obtain an empty slot in the receive queue.
1132 size_t n = 1;
1133 size_t i = spscring_p_alloc(&impl->rxring, &n);
1134 if (n)
1135 frame = &impl->rxbuf[i];
1136
1137 // Try to read a CAN or CAN FD format frame from the CAN bus.
1138 int flags = 0;
1139 result = io_can_fd_read(fd, &frame->frame, &frame->nbytes,
1140 &flags, &frame->ts,
1141 impl->poll ? 0 : LELY_IO_RX_TIMEOUT);
1142 errc = !result ? 0 : errno;
1143 wouldblock = errc == EAGAIN || errc == EWOULDBLOCK;
1144
1145 // Convert the frame from the SocketCAN format if it is a write
1146 // confirmation.
1147 struct can_msg msg;
1148 if (!result && (flags & MSG_CONFIRM)) {
1149 void *src = &frame->frame;
1150#if !LELY_NO_CANFD
1151 if (frame->nbytes == CANFD_MTU)
1152 canfd_frame2can_msg(src, &msg);
1153 else
1154#endif
1155 can_frame2can_msg(src, &msg);
1156 }
1157
1158 // Make the frame available for reading.
1159 if (!result && !(flags & MSG_CONFIRM))
1160 spscring_p_commit(&impl->rxring, n);
1161
1162#if !LELY_NO_THREADS
1163 pthread_mutex_lock(&impl->mtx);
1164#endif
1165 // Process the write confirmation, if any.
1166 if (!result && flags & MSG_CONFIRM)
1167 io_can_chan_impl_do_confirm(impl, &queue, &msg);
1168
1169 // Stop if the operation did or would block, or if an error
1170 // occurred.
1171 if (!impl->poll || result < 0)
1172 break;
1173 }
1174 // Cancel all pending read operations on error.
1175 if (result < 0 && !wouldblock) {
1176 io_can_chan_impl_do_read(impl, &queue, NULL);
1177 while ((task = ev_task_from_node(
1178 sllist_pop_front(&impl->read_queue)))) {
1179 struct io_can_chan_read *read =
1181 read->r.result = result;
1182 read->r.errc = errc;
1183 sllist_push_back(&queue, &task->_node);
1184 }
1185 }
1186 // clang-format off
1187 int post_rxbuf = !(sllist_empty(&impl->read_queue)
1188 && sllist_empty(&impl->confirm_queue))
1189 && impl->fd != -1 && !impl->shutdown;
1190 // clang-format on
1191 // If a read operation would block, start monitoring the file descriptor
1192 // for I/O events.
1193 if (post_rxbuf && impl->poll && wouldblock) {
1194 int events = impl->events | IO_EVENT_IN;
1195 // clang-format off
1196 if (!io_poll_watch(impl->poll, impl->fd, events,
1197 &impl->watch)) {
1198 // clang-format on
1199 impl->events = events;
1200 // Do not repost this thask unless registering the file
1201 // descriptor fails.
1202 post_rxbuf = 0;
1203 }
1204 }
1205 impl->rxbuf_posted = post_rxbuf;
1206#if !LELY_NO_THREADS
1207 pthread_mutex_unlock(&impl->mtx);
1208#endif
1209
1210 ev_task_queue_post(&queue);
1211
1212 if (post_rxbuf)
1213 ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1214
1215 errno = errsv;
1216}
1217
1218static void
1219io_can_chan_impl_read_task_func(struct ev_task *task)
1220{
1221 assert(task);
1222 struct io_can_chan_impl *impl =
1223 structof(task, struct io_can_chan_impl, read_task);
1224
1225 struct sllist queue;
1226 sllist_init(&queue);
1227
1228#if !LELY_NO_THREADS
1229 pthread_mutex_lock(&impl->mtx);
1230#endif
1231 // Process any pending read operations that can be satisfied.
1232 int wouldblock = 0;
1233 io_can_chan_impl_do_read(impl, &queue, &wouldblock);
1234 int post_rxbuf = 0;
1235 // Repost this task if any read operations remain in the queue.
1236 int post_read = !sllist_empty(&impl->read_queue) && !impl->shutdown;
1237 // Register a wait operation if the receive queue is empty.
1238 if (post_read && wouldblock) {
1239#if !LELY_NO_THREADS
1240 pthread_mutex_lock(&impl->c_mtx);
1241#endif
1242 // Do not repost this task unless the wait condition can be
1243 // satisfied immediately.
1244 post_read = !spscring_c_submit_wait(&impl->rxring, 1,
1245 io_can_chan_impl_c_signal, impl);
1246#if !LELY_NO_THREADS
1247 pthread_mutex_unlock(&impl->c_mtx);
1248#endif
1249 if (!post_read)
1250 // If the receive queue is empty, start reading more CAN
1251 // frames, unless we're already waiting for one.
1252 post_rxbuf = !impl->rxbuf_posted
1253 && !(impl->events & IO_EVENT_IN)
1254 && impl->fd != -1 && !impl->shutdown;
1255 }
1256 // cppcheck-suppress knownConditionTrueFalse
1257 if (post_rxbuf)
1258 impl->rxbuf_posted = 1;
1259 impl->read_posted = post_read;
1260#if !LELY_NO_THREADS
1261 pthread_mutex_unlock(&impl->mtx);
1262#endif
1263
1264 ev_task_queue_post(&queue);
1265
1266 // cppcheck-suppress knownConditionTrueFalse
1267 if (post_rxbuf)
1268 ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1269
1270 if (post_read)
1271 ev_exec_post(impl->read_task.exec, &impl->read_task);
1272}
1273
1274static void
1275io_can_chan_impl_write_task_func(struct ev_task *task)
1276{
1277 assert(task);
1278 struct io_can_chan_impl *impl =
1279 structof(task, struct io_can_chan_impl, write_task);
1280
1281 int errsv = errno;
1282
1283 int wouldblock = 0;
1284
1285#if !LELY_NO_THREADS
1286 pthread_mutex_lock(&impl->mtx);
1287#endif
1288 // Try to process all pending write operations at once, unless we're in
1289 // blocking mode.
1290 while ((task = impl->current_write = ev_task_from_node(
1291 sllist_pop_front(&impl->write_queue)))) {
1292 int fd = impl->fd;
1293#if !LELY_NO_THREADS
1294 pthread_mutex_unlock(&impl->mtx);
1295#endif
1296 struct io_can_chan_write *write =
1298 int result = io_can_fd_write_msg(fd, write->msg,
1299 impl->poll ? 0 : LELY_IO_TX_TIMEOUT);
1300 int errc = !result ? 0 : errno;
1301 wouldblock = errc == EAGAIN || errc == EWOULDBLOCK;
1302 if (!wouldblock && errc)
1303 // The operation failed immediately.
1304 io_can_chan_write_post(write, errc);
1305#if !LELY_NO_THREADS
1306 pthread_mutex_lock(&impl->mtx);
1307#endif
1308 if (!errc)
1309 // Wait for the write confirmation.
1310 sllist_push_back(&impl->confirm_queue, &task->_node);
1311 if (task == impl->current_write) {
1312 // Put the write operation back on the queue if it would
1313 // block, unless it was canceled.
1314 if (wouldblock) {
1316 &task->_node);
1317 task = NULL;
1318 }
1319 impl->current_write = NULL;
1320 }
1321 assert(!impl->current_write);
1322 // Stop if the operation did or would block.
1323 if (!impl->poll || wouldblock)
1324 break;
1325 }
1326 // If we're waiting for a write confirmation, start reading more CAN
1327 // frames, unless we're already waiting for one.
1328 int post_rxbuf = !impl->rxbuf_posted
1329 && !sllist_empty(&impl->confirm_queue)
1330 && !(impl->events & IO_EVENT_IN) && impl->fd != -1
1331 && !impl->shutdown;
1332 if (post_rxbuf)
1333 impl->rxbuf_posted = 1;
1334 // Repost this task if any write operations remain in the queue.
1335 int post_write = !sllist_empty(&impl->write_queue) && impl->fd != -1
1336 && !impl->shutdown;
1337 // If a write operation would block, start monitoring the file
1338 // descriptor for I/O events.
1339 if (post_write && impl->poll && wouldblock) {
1340 int events = impl->events | IO_EVENT_OUT;
1341 // clang-format off
1342 if (!io_poll_watch(impl->poll, impl->fd, events,
1343 &impl->watch)) {
1344 // clang-format on
1345 impl->events = events;
1346 // Do not repost this task unless registering the file
1347 // descriptor fails.
1348 post_write = 0;
1349 }
1350 }
1351 impl->write_posted = post_write;
1352#if !LELY_NO_THREADS
1353 pthread_mutex_unlock(&impl->mtx);
1354#endif
1355
1356 if (task && wouldblock)
1357 // The operation would block but was canceled before it could be
1358 // requeued.
1359 io_can_chan_write_post(
1360 io_can_chan_write_from_task(task), ECANCELED);
1361
1362 if (post_rxbuf)
1363 ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1364
1365 if (post_write)
1366 ev_exec_post(impl->write_task.exec, &impl->write_task);
1367
1368 errno = errsv;
1369}
1370
1371static inline struct io_can_chan_impl *
1372io_can_chan_impl_from_dev(const io_dev_t *dev)
1373{
1374 assert(dev);
1375
1376 return structof(dev, struct io_can_chan_impl, dev_vptr);
1377}
1378
1379static inline struct io_can_chan_impl *
1380io_can_chan_impl_from_chan(const io_can_chan_t *chan)
1381{
1382 assert(chan);
1383
1384 return structof(chan, struct io_can_chan_impl, chan_vptr);
1385}
1386
1387static inline struct io_can_chan_impl *
1388io_can_chan_impl_from_svc(const struct io_svc *svc)
1389{
1390 assert(svc);
1391
1392 return structof(svc, struct io_can_chan_impl, svc);
1393}
1394
1395static void
1396io_can_chan_impl_c_signal(struct spscring *ring, void *arg)
1397{
1398 (void)ring;
1399 struct io_can_chan_impl *impl = arg;
1400 assert(impl);
1401
1402#if !LELY_NO_THREADS
1403 pthread_mutex_lock(&impl->mtx);
1404#endif
1405 int post_read = !impl->read_posted && !sllist_empty(&impl->read_queue)
1406 && !impl->shutdown;
1407 if (post_read)
1408 impl->read_posted = 1;
1409#if !LELY_NO_THREADS
1410 pthread_mutex_unlock(&impl->mtx);
1411#endif
1412 // cppcheck-suppress duplicateCondition
1413 if (post_read)
1414 ev_exec_post(impl->read_task.exec, &impl->read_task);
1415}
1416
1417static void
1418io_can_chan_impl_do_pop(struct io_can_chan_impl *impl,
1419 struct sllist *read_queue, struct sllist *write_queue,
1420 struct sllist *confirm_queue, struct ev_task *task)
1421{
1422 assert(impl);
1423 assert(read_queue);
1424 assert(write_queue);
1425 assert(confirm_queue);
1426
1427 if (!task) {
1431 } else if (sllist_remove(&impl->read_queue, &task->_node)) {
1432 sllist_push_back(read_queue, &task->_node);
1433 } else if (sllist_remove(&impl->write_queue, &task->_node)) {
1434 sllist_push_back(write_queue, &task->_node);
1435 } else if (sllist_remove(&impl->confirm_queue, &task->_node)) {
1436 sllist_push_back(confirm_queue, &task->_node);
1437 }
1438}
1439
1440static void
1441io_can_chan_impl_do_read(struct io_can_chan_impl *impl, struct sllist *queue,
1442 int *pwouldblock)
1443{
1444 assert(impl);
1445 assert(queue);
1446
1447 int errsv = errno;
1448
1449 int wouldblock = 0;
1450
1451 struct slnode *node;
1452 while ((node = sllist_first(&impl->read_queue))) {
1453 struct ev_task *task = ev_task_from_node(node);
1454 struct io_can_chan_read *read =
1456
1457#if !LELY_NO_THREADS
1458 pthread_mutex_lock(&impl->c_mtx);
1459#endif
1460
1461 // Check if a frame is available in the receive queue.
1462 size_t n = 1;
1463 size_t i = spscring_c_alloc(&impl->rxring, &n);
1464 if (!n) {
1465#if !LELY_NO_THREADS
1466 pthread_mutex_unlock(&impl->c_mtx);
1467#endif
1468 wouldblock = 1;
1469 break;
1470 }
1471
1472 // Copy the frame from the buffer.
1473 struct io_can_frame *frame = &impl->rxbuf[i];
1474 void *data = &frame->frame;
1475 int is_err = can_frame2can_err(data, read->err);
1476 if (!is_err && read->msg) {
1477#if !LELY_NO_CANFD
1478 if (frame->nbytes == CANFD_MTU)
1479 canfd_frame2can_msg(data, read->msg);
1480 else
1481#endif
1482 can_frame2can_msg(data, read->msg);
1483 }
1484 if (read->tp)
1485 *read->tp = frame->ts;
1486 spscring_c_commit(&impl->rxring, 1);
1487
1488#if !LELY_NO_THREADS
1489 pthread_mutex_unlock(&impl->c_mtx);
1490#endif
1491
1492 read->r.result = !is_err;
1493 read->r.errc = 0;
1494
1496 sllist_push_back(queue, node);
1497 }
1498
1499 if (pwouldblock)
1500 *pwouldblock = wouldblock;
1501
1502 errno = errsv;
1503}
1504
1505static void
1506io_can_chan_impl_do_confirm(struct io_can_chan_impl *impl, struct sllist *queue,
1507 const struct can_msg *msg)
1508{
1509 assert(impl);
1510 assert(queue);
1511 assert(msg);
1512
1513 // Find the matching write operation.
1514 struct slnode *node = sllist_first(&impl->confirm_queue);
1515 while (node) {
1517 ev_task_from_node(node));
1518 if (!can_msg_cmp(msg, write->msg))
1519 break;
1520 node = node->next;
1521 }
1522 if (!node)
1523 return;
1524
1525 // Complete the matching write operation. Any preceding write operations
1526 // waiting for confirmation are considered to have failed.
1527 struct ev_task *task;
1528 while ((task = ev_task_from_node(
1529 sllist_pop_front(&impl->confirm_queue)))) {
1530 sllist_push_front(queue, &task->_node);
1531 struct io_can_chan_write *write =
1533 if (&task->_node == node) {
1534 write->errc = 0;
1535 break;
1536 } else {
1537 write->errc = EIO;
1538 }
1539 }
1540}
1541
1542static size_t
1543io_can_chan_impl_do_abort_tasks(struct io_can_chan_impl *impl)
1544{
1545 assert(impl);
1546
1547 size_t n = 0;
1548
1549 // Try to abort io_can_chan_impl_rxbuf_task_func().
1550 // clang-format off
1551 if (impl->rxbuf_posted && ev_exec_abort(impl->rxbuf_task.exec,
1552 &impl->rxbuf_task)) {
1553 // clang-format on
1554 impl->rxbuf_posted = 0;
1555 n++;
1556 }
1557
1558 // Try to abort io_can_chan_impl_read_task_func().
1559 // clang-format off
1560 if (impl->read_posted && ev_exec_abort(impl->read_task.exec,
1561 &impl->read_task)) {
1562 // clang-format on
1563 impl->read_posted = 0;
1564 n++;
1565 }
1566
1567 // Try to abort io_can_chan_impl_write_task_func().
1568 // clang-format off
1569 if (impl->write_posted && ev_exec_abort(impl->write_task.exec,
1570 &impl->write_task)) {
1571 // clang-format on
1572 impl->write_posted = 0;
1573 n++;
1574 }
1575
1576 return n;
1577}
1578
1579static int
1580io_can_chan_impl_set_fd(struct io_can_chan_impl *impl, int fd, int flags)
1581{
1582 assert(impl);
1583 assert(!(flags & ~IO_CAN_BUS_FLAG_MASK));
1584
1585 struct sllist read_queue, write_queue, confirm_queue;
1586 sllist_init(&read_queue);
1587 sllist_init(&write_queue);
1588 sllist_init(&confirm_queue);
1589
1590#if !LELY_NO_THREADS
1591 pthread_mutex_lock(&impl->mtx);
1592#endif
1593
1594 if (impl->events) {
1595 impl->events = 0;
1596 // Stop monitoring I/O events.
1597 io_poll_watch(impl->poll, impl->fd, impl->events, &impl->watch);
1598 }
1599
1600#if !LELY_NO_THREADS
1601 pthread_mutex_lock(&impl->c_mtx);
1602#endif
1604 // Clear the receive queue.
1605 size_t n = SIZE_MAX;
1606 spscring_c_alloc(&impl->rxring, &n);
1607 if (n)
1608 spscring_c_commit(&impl->rxring, n);
1609#if !LELY_NO_THREADS
1610 pthread_mutex_unlock(&impl->c_mtx);
1611#endif
1612
1613 int tmp = impl->fd;
1614 impl->fd = fd;
1615 fd = tmp;
1616
1617 impl->flags = flags;
1618
1619 // Cancel pending operations.
1620 sllist_append(&read_queue, &impl->read_queue);
1621 sllist_append(&write_queue, &impl->write_queue);
1622 sllist_append(&confirm_queue, &impl->confirm_queue);
1623
1624 // Mark the ongoing write operation as canceled, if necessary.
1625 impl->current_write = NULL;
1626
1627#if !LELY_NO_THREADS
1628 pthread_mutex_unlock(&impl->mtx);
1629#endif
1630
1631 io_can_chan_read_queue_post(&read_queue, -1, ECANCELED);
1632 io_can_chan_write_queue_post(&write_queue, ECANCELED);
1633 io_can_chan_write_queue_post(&confirm_queue, ECANCELED);
1634
1635 return fd;
1636}
1637
1638#endif // !LELY_NO_STDIO && __linux__
@ CAN_FLAG_FDF
The FD Format (FDF) flag, formerly known as Extended Data Length (EDL).
Definition: msg.h:54
@ CAN_FLAG_BRS
The Bit Rate Switch (BRS) flag (only available in CAN FD format frames).
Definition: msg.h:62
This is the internal header file of the SocketCAN rtnetlink attributes functions.
void io_can_chan_destroy(io_can_chan_t *chan)
Destroys a CAN channel.
Definition: can_chan.c:409
int io_can_chan_get_handle(const io_can_chan_t *chan)
Returns the SocketCAN file descriptor associated with a CAN channel, or -1 if the channel is closed.
Definition: can_chan.c:418
int io_can_chan_close(io_can_chan_t *chan)
Closes the SocketCAN file descriptor associated with a CAN channel.
Definition: can_chan.c:575
#define LELY_IO_CAN_RXLEN
The default SocketCAN receive queue length (in number of CAN frames).
Definition: can_chan.c:57
io_can_chan_t * io_can_chan_create(io_poll_t *poll, ev_exec_t *exec, size_t rxlen)
Creates a new CAN channel.
Definition: can_chan.c:382
int io_can_chan_open(io_can_chan_t *chan, const io_can_ctrl_t *ctrl, int flags)
Opens a CAN channel.
Definition: can_chan.c:433
int io_can_chan_release(io_can_chan_t *chan)
Dissociates and returns the SocketCAN file descriptor from a CAN channel.
Definition: can_chan.c:561
int io_can_chan_assign(io_can_chan_t *chan, int fd)
Assigns an existing SocketCAN file descriptor to a CAN channel.
Definition: can_chan.c:503
int io_can_chan_is_open(const io_can_chan_t *chan)
Returns 1 if the CAN channel is open and 0 if not.
Definition: can_chan.c:569
This is the internal header file of the SocketCAN error frame conversion functions.
This is the internal header file of the SocketCAN CAN frame conversion functions.
This header file is part of the I/O library; it contains the I/O context and service declarations.
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:126
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:141
This header file is part of the utilities library; it contains the diagnostic declarations.
@ DIAG_WARNING
A warning.
Definition: diag.h:55
void diag(enum diag_severity severity, int errc, const char *format,...)
Emits a diagnostic message.
Definition: diag.c:171
@ IO_EVENT_IN
Data (other than priority data) MAY be read without blocking.
Definition: event.h:35
@ IO_EVENT_OUT
Data (bot normal and priority data) MAY be written without blocking.
Definition: event.h:46
@ IO_EVENT_ERR
An error has occurred. This event is always reported.
Definition: event.h:48
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:136
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:124
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:106
ssize_t io_fd_sendmsg(int fd, const struct msghdr *msg, int flags, int timeout)
Equivalent to POSIX sendmsg(fd, msg, flags | MSG_NOSIGNAL), except that if fd is non-blocking (or the...
Definition: fd.c:116
ssize_t io_fd_recvmsg(int fd, struct msghdr *msg, int flags, int timeout)
Equivalent to POSIX recvmsg(fd, msg, flags), except that if fd is non-blocking (or the implementation...
Definition: fd.c:80
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
const struct io_can_chan_vtbl *const io_can_chan_t
An abstract CAN channel.
Definition: can.h:59
struct io_can_chan_read * io_can_chan_read_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel read operation from a pointer to its completion task.
Definition: can.c:93
struct io_can_chan_write * io_can_chan_write_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel write operation from a pointer to its completion task.
Definition: can.c:99
const struct io_can_ctrl_vtbl *const io_can_ctrl_t
An abstract CAN controller.
Definition: can.h:56
@ IO_CAN_BUS_FLAG_BRS
Bit Rate Switch support is enabled.
Definition: can.h:44
@ IO_CAN_BUS_FLAG_FDF
FD Format (formerly Extended Data Length) support is enabled.
Definition: can.h:42
@ IO_CAN_BUS_FLAG_ERR
Reception of error frames is enabled.
Definition: can.h:39
#define LELY_IO_RX_TIMEOUT
The default timeout (in milliseconds) for I/O read operations.
Definition: io2.h:31
#define LELY_IO_TX_TIMEOUT
The default timeout (in milliseconds) for I/O write operations.
Definition: io2.h:36
This header file is part of the I/O library; it contains the CAN bus declarations for Linux.
unsigned int io_can_ctrl_get_index(const io_can_ctrl_t *ctrl)
Returns the interface index of a CAN controller.
Definition: can_ctrl.c:191
int io_can_ctrl_get_flags(const io_can_ctrl_t *ctrl)
Returns the flags specifying which CAN bus features are enabled.
Definition: can_ctrl.c:199
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
int can_msg_cmp(const void *p1, const void *p2)
Compares two CAN or CAN FD format frames.
Definition: msg.c:30
const struct io_dev_vtbl *const io_dev_t
An abstract I/O device.
Definition: dev.h:35
This header file is part of the I/O library; it contains the I/O polling declarations for POSIX platf...
#define IO_POLL_WATCH_INIT(func)
The static initializer for io_poll_watch.
Definition: poll.h:65
io_ctx_t * io_poll_get_ctx(const io_poll_t *poll)
Returns a pointer to the I/O context with which the I/O polling instance is registered.
Definition: poll.c:275
int io_poll_watch(io_poll_t *poll, io_handle_t handle, struct io_event *event, int keep)
Registers an I/O device with an I/O polling interface and instructs it to watch for certain events.
Definition: poll.c:252
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:194
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:257
void sllist_push_front(struct sllist *list, struct slnode *node)
Pushes a node to the front of a singly-linked list.
Definition: sllist.h:221
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:232
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:202
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:243
struct slnode * sllist_first(const struct sllist *list)
Returns a pointer to the first node in a singly-linked list.
Definition: sllist.h:271
int can_msg2canfd_frame(const struct can_msg *src, struct canfd_frame *dst)
Converts a can_msg frame to a SocketCAN CAN FD frame.
Definition: can_msg.h:141
int can_frame2can_msg(const struct can_frame *src, struct can_msg *dst)
Converts a SocketCAN CAN frame to a can_msg frame.
Definition: can_msg.h:51
int canfd_frame2can_msg(const struct canfd_frame *src, struct can_msg *dst)
Converts a SocketCAN CAN FD frame to a can_msg frame.
Definition: can_msg.h:112
int can_msg2can_frame(const struct can_msg *src, struct can_frame *dst)
Converts a can_msg frame to a SocketCAN CAN frame.
Definition: can_msg.h:80
This header file is part of the utilities library; it contains the single-producer,...
size_t spscring_c_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:262
int spscring_c_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:327
size_t spscring_p_capacity(struct spscring *ring)
Returns the total capacity available for a producer in a single-producer single-consumer ring buffer,...
Definition: spscring.c:65
size_t spscring_p_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a consumer and, if this satisfies a wait operation...
Definition: spscring.c:138
size_t spscring_c_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a producer and, if this satisfies a wait operation...
Definition: spscring.c:302
int spscring_c_abort_wait(struct spscring *ring)
Aborts a wait operation previously registered with spscring_c_submit_wait().
Definition: spscring.c:368
void spscring_init(struct spscring *ring, size_t size)
Initializes a single-producer, single-consumer ring buffer with the specified size.
Definition: spscring.c:47
size_t spscring_p_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:98
This is the internal header file of the Windows-specific I/O declarations.
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
An I/O polling interface.
Definition: poll.c:51
A CAN error frame.
Definition: err.h:28
A CAN or CAN FD format frame.
Definition: msg.h:87
uint_least8_t flags
The flags (any combination of CAN_FLAG_IDE, CAN_FLAG_RTR, CAN_FLAG_FDF, CAN_FLAG_BRS and CAN_FLAG_ESI...
Definition: msg.h:94
An executable task.
Definition: task.h:41
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
The implementation of a CAN channel.
Definition: can_chan.c:133
struct ev_task read_task
The task responsible for initiating read operations.
Definition: can_chan.c:155
int fd
The SocketCAN file descriptor.
Definition: can_chan.c:174
struct ev_task write_task
The task responsible for initiating write operations.
Definition: can_chan.c:157
struct io_can_frame * rxbuf
The receive queue.
Definition: can_chan.c:165
struct io_svc svc
The I/O service representing the channel.
Definition: can_chan.c:145
struct sllist read_queue
The queue containing pending read operations.
Definition: can_chan.c:188
struct ev_task rxbuf_task
The task responsible for filling the receive queue.
Definition: can_chan.c:153
io_poll_t * poll
A pointer to the polling instance used to watch for I/O events.
Definition: can_chan.c:143
unsigned write_posted
A flag indicating whether write_task has been posted to exec.
Definition: can_chan.c:186
int flags
The flags with which fd has been opened.
Definition: can_chan.c:176
unsigned read_posted
A flag indicating whether read_task has been posted to exec.
Definition: can_chan.c:184
pthread_mutex_t mtx
The mutex protecting the file descriptor, the flags and the queues of pending operations.
Definition: can_chan.c:171
struct sllist write_queue
The queue containing pending write operations.
Definition: can_chan.c:190
unsigned shutdown
A flag indicating whether the I/O service has been shut down.
Definition: can_chan.c:180
ev_exec_t * exec
A pointer to the executor used to execute all I/O tasks.
Definition: can_chan.c:149
const struct io_dev_vtbl * dev_vptr
A pointer to the virtual table for the I/O device interface.
Definition: can_chan.c:135
struct sllist confirm_queue
The queue containing write operations waiting to be confirmed.
Definition: can_chan.c:192
struct ev_task * current_write
The write operation currently being executed.
Definition: can_chan.c:194
pthread_mutex_t c_mtx
The mutex protecting the receive queue consumer.
Definition: can_chan.c:160
struct io_poll_watch watch
The object used to monitor the file descriptor for I/O events.
Definition: can_chan.c:151
const struct io_can_chan_vtbl * chan_vptr
A pointer to the virtual table for the CAN channel interface.
Definition: can_chan.c:137
io_ctx_t * ctx
A pointer to the I/O context with which the channel is registered.
Definition: can_chan.c:147
int events
The I/O events currently being monitored by poll for fd.
Definition: can_chan.c:178
struct spscring rxring
The ring buffer used to control the receive queue.
Definition: can_chan.c:163
unsigned rxbuf_posted
A flag indicating whether rxbuf_task has been posted to exec.
Definition: can_chan.c:182
int result
The result of the read operation: 1 if a CAN frame is received, 0 if an error frame is received,...
Definition: can.h:68
int errc
The error number, obtained as if by get_errc(), if result is -1.
Definition: can.h:70
A CAN channel read operation.
Definition: can.h:74
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition: can.h:98
struct io_can_chan_read_result r
The result of the read operation.
Definition: can.h:100
struct timespec * tp
The address at which to store the system time at which the CAN frame or CAN error frame was received.
Definition: can.h:93
struct can_msg * msg
The address at which to store the CAN frame.
Definition: can.h:80
struct can_err * err
The address at which to store the CAN error frame.
Definition: can.h:86
A CAN channel write operation.
Definition: can.h:110
const struct can_msg * msg
A pointer to the CAN frame to be written.
Definition: can.h:116
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the write operation.
Definition: can.h:121
int errc
The error number, obtained as if by get_errc(), if an error occurred or the operation was canceled.
Definition: can.h:126
Definition: ctx.c:38
An object representing a file descriptor being monitored for I/O events.
Definition: poll.h:56
The virtual table of an I/O service.
Definition: ctx.h:67
An I/O service.
Definition: ctx.h:49
A singly-linked list.
Definition: sllist.h:52
A node in a singly-linked list.
Definition: sllist.h:40
struct slnode * next
A pointer to the next node in the list.
Definition: sllist.h:42
A single-producer, single-consumer ring buffer.
Definition: spscring.h:63
size_t ev_task_queue_abort(struct sllist *queue)
Aborts the tasks in queue by invoking ev_exec_on_task_fini() for each of them.
Definition: task.c:55
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
size_t ev_task_queue_post(struct sllist *queue)
Post the tasks in queue to their respective executors and invokes ev_exec_on_task_fini() for each of ...
Definition: task.c:38
This header file is part of the C11 and POSIX compatibility library; it includes <unistd....
This header file is part of the utilities library; it contains the time function declarations.