Lely core libraries  2.3.4
can_chan.c
Go to the documentation of this file.
1 
24 #include "io.h"
25 
26 #if !LELY_NO_STDIO && defined(__linux__)
27 
28 #include "../can.h"
29 #include <lely/io2/ctx.h>
30 #include <lely/io2/linux/can.h>
31 #include <lely/io2/posix/poll.h>
32 #include <lely/util/diag.h>
33 #include <lely/util/spscring.h>
34 #include <lely/util/time.h>
35 #include <lely/util/util.h>
36 
37 #include <assert.h>
38 #include <errno.h>
39 #include <stdlib.h>
40 
41 #if !LELY_NO_THREADS
42 #include <pthread.h>
43 #endif
44 #include <unistd.h>
45 
46 #include <linux/can/raw.h>
47 #include <linux/sockios.h>
48 #include <sys/ioctl.h>
49 
50 #include "../posix/fd.h"
51 #include "can_attr.h"
52 #include "can_err.h"
53 #include "can_msg.h"
54 
55 #ifndef LELY_IO_CAN_RXLEN
56 #define LELY_IO_CAN_RXLEN 1024
58 #endif
59 
60 struct io_can_frame {
61 #if LELY_NO_CANFD
62  struct can_frame frame;
63 #else
64  struct canfd_frame frame;
65 #endif
66  size_t nbytes;
67  struct timespec ts;
68 };
69 
70 static int io_can_fd_set_default(int fd);
71 #if LELY_NO_CANFD
72 static int io_can_fd_read(int fd, struct can_frame *frame, size_t *pnbytes,
73  int *pflags, struct timespec *tp, int timeout);
74 #else
75 static int io_can_fd_read(int fd, struct canfd_frame *frame, size_t *pnbytes,
76  int *pflags, struct timespec *tp, int timeout);
77 #endif
78 #if LELY_NO_CANFD
79 static int io_can_fd_write(int fd, const struct can_frame *frame, size_t nbytes,
80  int dontwait);
81 #else
82 static int io_can_fd_write(int fd, const struct canfd_frame *frame,
83  size_t nbytes, int timeout);
84 #endif
85 static int io_can_fd_write_msg(int fd, const struct can_msg *msg, int timeout);
86 
87 static io_ctx_t *io_can_chan_impl_dev_get_ctx(const io_dev_t *dev);
88 static ev_exec_t *io_can_chan_impl_dev_get_exec(const io_dev_t *dev);
89 static size_t io_can_chan_impl_dev_cancel(io_dev_t *dev, struct ev_task *task);
90 static size_t io_can_chan_impl_dev_abort(io_dev_t *dev, struct ev_task *task);
91 
92 // clang-format off
93 static const struct io_dev_vtbl io_can_chan_impl_dev_vtbl = {
94  &io_can_chan_impl_dev_get_ctx,
95  &io_can_chan_impl_dev_get_exec,
96  &io_can_chan_impl_dev_cancel,
97  &io_can_chan_impl_dev_abort
98 };
99 // clang-format on
100 
101 static io_dev_t *io_can_chan_impl_get_dev(const io_can_chan_t *chan);
102 static int io_can_chan_impl_get_flags(const io_can_chan_t *chan);
103 static int io_can_chan_impl_read(io_can_chan_t *chan, struct can_msg *msg,
104  struct can_err *err, struct timespec *tp, int timeout);
105 static void io_can_chan_impl_submit_read(
106  io_can_chan_t *chan, struct io_can_chan_read *read);
107 static int io_can_chan_impl_write(
108  io_can_chan_t *chan, const struct can_msg *msg, int timeout);
109 static void io_can_chan_impl_submit_write(
110  io_can_chan_t *chan, struct io_can_chan_write *write);
111 
112 // clang-format off
113 static const struct io_can_chan_vtbl io_can_chan_impl_vtbl = {
114  &io_can_chan_impl_get_dev,
115  &io_can_chan_impl_get_flags,
116  &io_can_chan_impl_read,
117  &io_can_chan_impl_submit_read,
118  &io_can_chan_impl_write,
119  &io_can_chan_impl_submit_write
120 };
121 // clang-format on
122 
123 static void io_can_chan_impl_svc_shutdown(struct io_svc *svc);
124 
125 // clang-format off
126 static const struct io_svc_vtbl io_can_chan_impl_svc_vtbl = {
127  NULL,
128  &io_can_chan_impl_svc_shutdown
129 };
130 // clang-format on
131 
135  const struct io_dev_vtbl *dev_vptr;
145  struct io_svc svc;
158 #if !LELY_NO_THREADS
159  pthread_mutex_t c_mtx;
161 #endif
162  struct spscring rxring;
166 #if !LELY_NO_THREADS
167 
171  pthread_mutex_t mtx;
172 #endif
173  int fd;
176  int flags;
178  int events;
180  unsigned shutdown : 1;
182  unsigned rxbuf_posted : 1;
184  unsigned read_posted : 1;
186  unsigned write_posted : 1;
195 };
196 
197 static void io_can_chan_impl_watch_func(
198  struct io_poll_watch *watch, int events);
199 static void io_can_chan_impl_rxbuf_task_func(struct ev_task *task);
200 static void io_can_chan_impl_read_task_func(struct ev_task *task);
201 static void io_can_chan_impl_write_task_func(struct ev_task *task);
202 
203 static inline struct io_can_chan_impl *io_can_chan_impl_from_dev(
204  const io_dev_t *dev);
205 static inline struct io_can_chan_impl *io_can_chan_impl_from_chan(
206  const io_can_chan_t *chan);
207 static inline struct io_can_chan_impl *io_can_chan_impl_from_svc(
208  const struct io_svc *svc);
209 
210 static void io_can_chan_impl_c_signal(struct spscring *ring, void *arg);
211 
212 static void io_can_chan_impl_do_pop(struct io_can_chan_impl *impl,
213  struct sllist *read_queue, struct sllist *write_queue,
214  struct sllist *confirm_queue, struct ev_task *task);
215 
216 static void io_can_chan_impl_do_read(struct io_can_chan_impl *impl,
217  struct sllist *queue, int *pwouldblock);
218 static void io_can_chan_impl_do_confirm(struct io_can_chan_impl *impl,
219  struct sllist *queue, const struct can_msg *msg);
220 
221 static size_t io_can_chan_impl_do_abort_tasks(struct io_can_chan_impl *impl);
222 
223 static int io_can_chan_impl_set_fd(
224  struct io_can_chan_impl *impl, int fd, int flags);
225 
226 void *
227 io_can_chan_alloc(void)
228 {
229  struct io_can_chan_impl *impl = malloc(sizeof(*impl));
230  if (!impl)
231  return NULL;
232  // Suppress a GCC maybe-uninitialized warning.
233  impl->chan_vptr = NULL;
234  // cppcheck-suppress memleak symbolName=impl
235  return &impl->chan_vptr;
236 }
237 
238 void
239 io_can_chan_free(void *ptr)
240 {
241  if (ptr)
242  free(io_can_chan_impl_from_chan(ptr));
243 }
244 
246 io_can_chan_init(io_can_chan_t *chan, io_poll_t *poll, ev_exec_t *exec,
247  size_t rxlen)
248 {
249  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
250  io_ctx_t *ctx = poll ? io_poll_get_ctx(poll) : NULL;
251 
252  if (!rxlen)
253  rxlen = LELY_IO_CAN_RXLEN;
254 
255  int errsv = 0;
256 
257  impl->dev_vptr = &io_can_chan_impl_dev_vtbl;
258  impl->chan_vptr = &io_can_chan_impl_vtbl;
259 
260  impl->poll = poll;
261 
262  impl->svc = (struct io_svc)IO_SVC_INIT(&io_can_chan_impl_svc_vtbl);
263  impl->ctx = ctx;
264 
265  impl->exec = exec;
266 
267  impl->watch = (struct io_poll_watch)IO_POLL_WATCH_INIT(
268  &io_can_chan_impl_watch_func);
269 
270  impl->rxbuf_task = (struct ev_task)EV_TASK_INIT(
271  impl->exec, &io_can_chan_impl_rxbuf_task_func);
272  impl->read_task = (struct ev_task)EV_TASK_INIT(
273  impl->exec, &io_can_chan_impl_read_task_func);
274  impl->write_task = (struct ev_task)EV_TASK_INIT(
275  impl->exec, &io_can_chan_impl_write_task_func);
276 
277 #if !LELY_NO_THREADS
278  if ((errsv = pthread_mutex_init(&impl->c_mtx, NULL)))
279  goto error_init_c_mtx;
280 #endif
281 
282  spscring_init(&impl->rxring, rxlen);
283  impl->rxbuf = calloc(rxlen, sizeof(struct io_can_frame));
284  if (!impl->rxbuf) {
285  errsv = errno;
286  goto error_alloc_rxbuf;
287  }
288 
289 #if !LELY_NO_THREADS
290  if ((errsv = pthread_mutex_init(&impl->mtx, NULL)))
291  goto error_init_mtx;
292 #endif
293 
294  impl->fd = -1;
295  impl->flags = 0;
296  impl->events = 0;
297 
298  impl->shutdown = 0;
299  impl->rxbuf_posted = 0;
300  impl->read_posted = 0;
301  impl->write_posted = 0;
302 
303  sllist_init(&impl->read_queue);
304  sllist_init(&impl->write_queue);
305  sllist_init(&impl->confirm_queue);
306  impl->current_write = NULL;
307 
308  if (impl->ctx)
309  io_ctx_insert(impl->ctx, &impl->svc);
310 
311  return chan;
312 
313 #if !LELY_NO_THREADS
314  // pthread_mutex_destroy(&impl->mtx);
315 error_init_mtx:
316 #endif
317  free(impl->rxbuf);
318 error_alloc_rxbuf:
319 #if !LELY_NO_THREADS
320  pthread_mutex_destroy(&impl->c_mtx);
321 error_init_c_mtx:
322 #endif
323  errno = errsv;
324  return NULL;
325 }
326 
327 void
328 io_can_chan_fini(io_can_chan_t *chan)
329 {
330  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
331 
332  if (impl->ctx)
333  io_ctx_remove(impl->ctx, &impl->svc);
334  // Cancel all pending operations.
335  io_can_chan_impl_svc_shutdown(&impl->svc);
336 
337 #if !LELY_NO_THREADS
338  // Abort any consumer wait operation running in a task.
339  pthread_mutex_lock(&impl->c_mtx);
341  pthread_mutex_unlock(&impl->c_mtx);
342 
343  int warning = 0;
344  pthread_mutex_lock(&impl->mtx);
345  // If necessary, busy-wait until io_can_chan_impl_rxbuf_task_func(),
346  // io_can_chan_impl_read_task_func() and
347  // io_can_chan_impl_write_task_func() complete.
348  while (impl->rxbuf_posted || impl->read_posted || impl->write_posted) {
349  if (io_can_chan_impl_do_abort_tasks(impl))
350  continue;
351  pthread_mutex_unlock(&impl->mtx);
352  if (!warning) {
353  warning = 1;
354  diag(DIAG_WARNING, 0,
355  "io_can_chan_fini() invoked with pending operations");
356  }
357  sched_yield();
358  pthread_mutex_lock(&impl->mtx);
359  }
360  pthread_mutex_unlock(&impl->mtx);
361 #endif
362 
363  // Close the socket.
364  if (impl->fd != -1) {
365  if (impl->events)
366  io_poll_watch(impl->poll, impl->fd, 0, &impl->watch);
367  close(impl->fd);
368  }
369 
370 #if !LELY_NO_THREADS
371  pthread_mutex_destroy(&impl->mtx);
372 #endif
373 
374  free(impl->rxbuf);
375 
376 #if !LELY_NO_THREADS
377  pthread_mutex_destroy(&impl->c_mtx);
378 #endif
379 }
380 
383 {
384  int errsv = 0;
385 
386  io_can_chan_t *chan = io_can_chan_alloc();
387  if (!chan) {
388  errsv = errno;
389  goto error_alloc;
390  }
391 
392  io_can_chan_t *tmp = io_can_chan_init(chan, poll, exec, rxlen);
393  if (!tmp) {
394  errsv = errno;
395  goto error_init;
396  }
397  chan = tmp;
398 
399  return chan;
400 
401 error_init:
402  io_can_chan_free((void *)chan);
403 error_alloc:
404  errno = errsv;
405  return NULL;
406 }
407 
408 void
410 {
411  if (chan) {
412  io_can_chan_fini(chan);
413  io_can_chan_free((void *)chan);
414  }
415 }
416 
417 int
419 {
420  const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
421 
422 #if !LELY_NO_THREADS
423  pthread_mutex_lock((pthread_mutex_t *)&impl->mtx);
424 #endif
425  int fd = impl->fd;
426 #if !LELY_NO_THREADS
427  pthread_mutex_unlock((pthread_mutex_t *)&impl->mtx);
428 #endif
429  return fd;
430 }
431 
432 int
434 {
435  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
436 
438  errno = EINVAL;
439  return -1;
440  }
441 
442  int errsv = 0;
443 
444  int fd = socket(AF_CAN, SOCK_RAW | SOCK_CLOEXEC, CAN_RAW);
445  if (fd == -1) {
446  errsv = errno;
447  goto error_socket;
448  }
449 
450  struct sockaddr_can addr = { .can_family = AF_CAN,
451  .can_ifindex = io_can_ctrl_get_index(ctrl) };
452 
453  if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
454  errsv = errno;
455  goto error_bind;
456  }
457 
458  if (flags & IO_CAN_BUS_FLAG_ERR) {
459  can_err_mask_t optval = CAN_ERR_MASK;
460  // clang-format off
461  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_ERR_FILTER, &optval,
462  sizeof(optval)) == -1) {
463  // clang-format on
464  errsv = errno;
465  goto error_setsockopt;
466  }
467  }
468 
469 #if !LELY_NO_CANFD
470  if (flags & IO_CAN_BUS_FLAG_FDF) {
471  int optval = 1;
472  // clang-format off
473  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_FD_FRAMES, &optval,
474  sizeof(optval)) == -1) {
475  // clang-format on
476  errsv = errno;
477  goto error_setsockopt;
478  }
479  }
480 #endif
481 
482  if (io_can_fd_set_default(fd) == -1) {
483  errsv = errno;
484  goto error_set_default;
485  }
486 
487  fd = io_can_chan_impl_set_fd(impl, fd, flags);
488  if (fd != -1)
489  close(fd);
490 
491  return 0;
492 
493 error_set_default:
494 error_setsockopt:
495 error_bind:
496  close(fd);
497 error_socket:
498  errno = errsv;
499  return -1;
500 }
501 
502 int
504 {
505  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
506 
507  struct sockaddr_can addr = { .can_family = AF_UNSPEC };
508  socklen_t addrlen = sizeof(addr);
509  if (getsockname(fd, (struct sockaddr *)&addr, &addrlen) == -1)
510  return -1;
511  if (addrlen < sizeof(addr) || addr.can_family != AF_CAN) {
512  errno = ENODEV;
513  return -1;
514  }
515  unsigned int ifindex = addr.can_ifindex;
516 
517  struct io_can_attr attr = IO_CAN_ATTR_INIT;
518  if (io_can_attr_get(&attr, ifindex) == -1)
519  return -1;
520  int flags = attr.flags;
521 
522  {
523  can_err_mask_t optval = 0;
524  socklen_t optlen = sizeof(optval);
525  // clang-format off
526  if (getsockopt(fd, SOL_CAN_RAW, CAN_RAW_ERR_FILTER, &optval,
527  &optlen) == -1)
528  // clang-format on
529  return -1;
530  if (optval & CAN_ERR_MASK)
531  flags |= IO_CAN_BUS_FLAG_ERR;
532  }
533 
534 #if !LELY_NO_CANFD
535  // Check if CAN FD frames are allowed. If the check fails, we assume
536  // they are not.
537  {
538  int errsv = errno;
539  int optval = 0;
540  socklen_t optlen = sizeof(optval);
541  // clang-format off
542  if (!getsockopt(fd, SOL_CAN_RAW, CAN_RAW_FD_FRAMES, &optval,
543  &optlen) && !optval)
544  // clang-format on
546  errno = errsv;
547  }
548 #endif
549 
550  if (io_can_fd_set_default(fd) == -1)
551  return -1;
552 
553  fd = io_can_chan_impl_set_fd(impl, fd, flags);
554  if (fd != -1)
555  close(fd);
556 
557  return 0;
558 }
559 
560 int
562 {
563  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
564 
565  return io_can_chan_impl_set_fd(impl, -1, 0);
566 }
567 
568 int
570 {
571  return io_can_chan_get_handle(chan) != -1;
572 }
573 
574 int
576 {
577  int fd = io_can_chan_release(chan);
578  return fd != -1 ? close(fd) : 0;
579 }
580 
581 static int
582 io_can_fd_set_default(int fd)
583 {
584  int optval;
585 
586  // Enable local loopback.
587  optval = 1;
588  // clang-format off
589  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_LOOPBACK, &optval,
590  sizeof(optval)) == -1)
591  // clang-format on
592  return -1;
593 
594  // Enable the reception of CAN frames sent by this socket so we can
595  // check for successful transmission.
596  optval = 1;
597  // clang-format off
598  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_RECV_OWN_MSGS, &optval,
599  sizeof(optval)) == -1)
600  // clang-format on
601  return -1;
602 
603  // Set the size of the send buffer to its minimum value. This causes
604  // write operations to block (or return EAGAIN) instead of returning
605  // ENOBUFS.
606  optval = 0;
607  // clang-format off
608  if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval))
609  == -1)
610  // clang-format on
611  return -1;
612 
613  return 0;
614 }
615 
616 static int
617 #if LELY_NO_CANFD
618 io_can_fd_read(int fd, struct can_frame *frame, size_t *pnbytes, int *pflags,
619  struct timespec *tp, int timeout)
620 #else
621 io_can_fd_read(int fd, struct canfd_frame *frame, size_t *pnbytes, int *pflags,
622  struct timespec *tp, int timeout)
623 #endif
624 {
625  struct iovec iov = { .iov_base = (void *)frame,
626  .iov_len = sizeof(*frame) };
627  struct msghdr msg = { .msg_iov = &iov, .msg_iovlen = 1 };
628 
629  ssize_t result;
630  for (;;) {
631  result = io_fd_recvmsg(fd, &msg, 0, timeout);
632  if (result < 0)
633  return result;
634 #if LELY_NO_CANFD
635  if (result == CAN_MTU)
636 #else
637  if (result == CAN_MTU || result == CANFD_MTU)
638 #endif
639  break;
640  if (timeout > 0)
641  timeout = 0;
642  }
643 
644  if (pnbytes)
645  *pnbytes = result;
646 
647  if (pflags)
648  *pflags = msg.msg_flags;
649 
650  if (tp) {
651  if (msg.msg_flags & MSG_CONFIRM) {
652  // Ignore the timestamp for write confirmations.
653  *tp = (struct timespec){ 0, 0 };
654  } else {
655  struct timeval tv = { 0, 0 };
656  if (ioctl(fd, SIOCGSTAMP, &tv) == -1)
657  return -1;
658  tp->tv_sec = tv.tv_sec;
659  tp->tv_nsec = tv.tv_usec * 1000;
660  }
661  }
662 
663  return 0;
664 }
665 
666 static int
667 #if LELY_NO_CANFD
668 io_can_fd_write(int fd, const struct can_frame *frame, size_t nbytes,
669  int timeout)
670 #else
671 io_can_fd_write(int fd, const struct canfd_frame *frame, size_t nbytes,
672  int timeout)
673 #endif
674 {
675  struct iovec iov = { .iov_base = (void *)frame, .iov_len = nbytes };
676  struct msghdr msg = { .msg_iov = &iov, .msg_iovlen = 1 };
677 
678  return io_fd_sendmsg(fd, &msg, 0, timeout) > 0 ? 0 : -1;
679 }
680 
681 static int
682 io_can_fd_write_msg(int fd, const struct can_msg *msg, int timeout)
683 {
684  assert(msg);
685 
686  // Convert the frame to the SocketCAN format.
687  struct io_can_frame frame;
688 #if !LELY_NO_CANFD
689  if (msg->flags & CAN_FLAG_FDF) {
690  if (can_msg2canfd_frame(msg, &frame.frame) == -1) {
691  errno = EINVAL;
692  return -1;
693  }
694  frame.nbytes = CANFD_MTU;
695  } else {
696 #endif
697  if (can_msg2can_frame(msg, (struct can_frame *)&frame.frame)
698  == -1) {
699  errno = EINVAL;
700  return -1;
701  }
702  frame.nbytes = CAN_MTU;
703 #if !LELY_NO_CANFD
704  }
705 #endif
706 
707  return io_can_fd_write(fd, &frame.frame, frame.nbytes, timeout);
708 }
709 
710 static io_ctx_t *
711 io_can_chan_impl_dev_get_ctx(const io_dev_t *dev)
712 {
713  const struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
714 
715  return impl->ctx;
716 }
717 
718 static ev_exec_t *
719 io_can_chan_impl_dev_get_exec(const io_dev_t *dev)
720 {
721  const struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
722 
723  return impl->exec;
724 }
725 
726 static size_t
727 io_can_chan_impl_dev_cancel(io_dev_t *dev, struct ev_task *task)
728 {
729  struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
730 
731  size_t n = 0;
732 
733  struct sllist read_queue, write_queue, confirm_queue;
734  sllist_init(&read_queue);
735  sllist_init(&write_queue);
736  sllist_init(&confirm_queue);
737 
738 #if !LELY_NO_THREADS
739  pthread_mutex_lock(&impl->mtx);
740 #endif
741  io_can_chan_impl_do_pop(
742  impl, &read_queue, &write_queue, &confirm_queue, task);
743  // Mark the ongoing write operation as canceled, if necessary.
744  if (impl->current_write && (!task || task == impl->current_write)) {
745  impl->current_write = NULL;
746  n++;
747  }
748 #if !LELY_NO_THREADS
749  pthread_mutex_unlock(&impl->mtx);
750 #endif
751 
752  size_t nread = io_can_chan_read_queue_post(&read_queue, -1, ECANCELED);
753  n = n < SIZE_MAX - nread ? n + nread : SIZE_MAX;
754  size_t nwrite = io_can_chan_write_queue_post(&write_queue, ECANCELED);
755  n = n < SIZE_MAX - nwrite ? n + nwrite : SIZE_MAX;
756  size_t nconfirm =
757  io_can_chan_write_queue_post(&confirm_queue, ECANCELED);
758  n = n < SIZE_MAX - nconfirm ? n + nconfirm : SIZE_MAX;
759 
760  return n;
761 }
762 
763 static size_t
764 io_can_chan_impl_dev_abort(io_dev_t *dev, struct ev_task *task)
765 {
766  struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
767 
768  struct sllist queue;
769  sllist_init(&queue);
770 
771 #if !LELY_NO_THREADS
772  pthread_mutex_lock(&impl->mtx);
773 #endif
774  io_can_chan_impl_do_pop(impl, &queue, &queue, &queue, task);
775 #if !LELY_NO_THREADS
776  pthread_mutex_unlock(&impl->mtx);
777 #endif
778 
779  return ev_task_queue_abort(&queue);
780 }
781 static io_dev_t *
782 io_can_chan_impl_get_dev(const io_can_chan_t *chan)
783 {
784  const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
785 
786  return &impl->dev_vptr;
787 }
788 
789 static int
790 io_can_chan_impl_get_flags(const io_can_chan_t *chan)
791 {
792  const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
793 
794 #if !LELY_NO_THREADS
795  pthread_mutex_lock((pthread_mutex_t *)&impl->mtx);
796 #endif
797  int flags = impl->flags;
798 #if !LELY_NO_THREADS
799  pthread_mutex_unlock((pthread_mutex_t *)&impl->mtx);
800 #endif
801  return flags;
802 }
803 
804 static int
805 io_can_chan_impl_read(io_can_chan_t *chan, struct can_msg *msg,
806  struct can_err *err, struct timespec *tp, int timeout)
807 {
808  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
809 
810  struct io_can_frame frame_;
811  struct io_can_frame *frame = &frame_;
812 #if !LELY_NO_THREADS
813  pthread_mutex_lock(&impl->c_mtx);
814 #endif
815  size_t n = 0;
816  for (;;) {
817  // Check if a frame is available in the receive queue.
818  n = 1;
819  size_t i = spscring_c_alloc(&impl->rxring, &n);
820  if (n) {
821  frame = &impl->rxbuf[i];
822  break;
823  }
824 #if !LELY_NO_THREADS
825  pthread_mutex_unlock(&impl->c_mtx);
826  pthread_mutex_lock(&impl->mtx);
827 #endif
828  // If not, read a frame directly.
829  int fd = impl->fd;
830 #if !LELY_NO_THREADS
831  pthread_mutex_unlock(&impl->mtx);
832 #endif
833  int flags = 0;
834  // clang-format off
835  if (io_can_fd_read(fd, &frame->frame, &frame->nbytes, &flags,
836  &frame->ts, timeout) < 0)
837  // clang-format on
838  return -1;
839  // Process the frame unless it is a write confirmation.
840  if (!(flags & MSG_CONFIRM))
841  break;
842  // Convert the frame from the SocketCAN format.
843  void *src = &frame->frame;
844  struct can_msg msg;
845 #if !LELY_NO_CANFD
846  if (frame->nbytes == CANFD_MTU)
847  canfd_frame2can_msg(src, &msg);
848  else
849 #endif
850  can_frame2can_msg(src, &msg);
851  // Process the write confirmation.
852  struct sllist queue;
853  sllist_init(&queue);
854 #if !LELY_NO_THREADS
855  pthread_mutex_lock(&impl->mtx);
856 #endif
857  io_can_chan_impl_do_confirm(impl, &queue, &msg);
858 #if !LELY_NO_THREADS
859  pthread_mutex_unlock(&impl->mtx);
860 #endif
861  ev_task_queue_post(&queue);
862  // Since the timeout is relative, we can only use a positive
863  // value once.
864  if (timeout > 0)
865  timeout = 0;
866 #if !LELY_NO_THREADS
867  pthread_mutex_lock(&impl->c_mtx);
868 #endif
869  }
870  // Parse the frame.
871  void *data = &frame->frame;
872  int is_err = can_frame2can_err(data, err);
873  if (!is_err && msg) {
874 #if !LELY_NO_CANFD
875  if (frame->nbytes == CANFD_MTU)
876  canfd_frame2can_msg(data, msg);
877  else
878 #endif
879  can_frame2can_msg(data, msg);
880  }
881  if (tp)
882  *tp = frame->ts;
883  // Remove the frame from the receive queue, if necessary.
884  if (n) {
885  spscring_c_commit(&impl->rxring, n);
886 #if !LELY_NO_THREADS
887  pthread_mutex_unlock(&impl->c_mtx);
888 #endif
889  }
890 
891  return is_err == -1 ? -1 : !is_err;
892 }
893 
894 static void
895 io_can_chan_impl_submit_read(io_can_chan_t *chan, struct io_can_chan_read *read)
896 {
897  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
898  assert(read);
899  struct ev_task *task = &read->task;
900 
901  if (!task->exec)
902  task->exec = impl->exec;
903  assert(task->exec);
904  ev_exec_on_task_init(task->exec);
905 
906 #if !LELY_NO_THREADS
907  pthread_mutex_lock(&impl->mtx);
908 #endif
909  if (impl->shutdown) {
910 #if !LELY_NO_THREADS
911  pthread_mutex_unlock(&impl->mtx);
912 #endif
913  io_can_chan_read_post(read, -1, ECANCELED);
914  } else {
915  int post_read = !impl->read_posted
916  && sllist_empty(&impl->read_queue);
917  sllist_push_back(&impl->read_queue, &task->_node);
918  if (post_read)
919  impl->read_posted = 1;
920 #if !LELY_NO_THREADS
921  pthread_mutex_unlock(&impl->mtx);
922 #endif
923  assert(impl->read_task.exec);
924  if (post_read)
925  ev_exec_post(impl->read_task.exec, &impl->read_task);
926  }
927 }
928 
929 static int
930 io_can_chan_impl_write(
931  io_can_chan_t *chan, const struct can_msg *msg, int timeout)
932 {
933  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
934 
935 #if !LELY_NO_CANFD
936  int flags = 0;
937  if (msg->flags & CAN_FLAG_FDF)
939  if (msg->flags & CAN_FLAG_BRS)
941 #endif
942 
943 #if !LELY_NO_THREADS
944  pthread_mutex_lock(&impl->mtx);
945 #endif
946 #if !LELY_NO_CANFD
947  if ((flags & impl->flags) != flags) {
948 #if !LELY_NO_THREADS
949  pthread_mutex_unlock(&impl->mtx);
950 #endif
951  errno = EINVAL;
952  return -1;
953  }
954 #endif
955  int fd = impl->fd;
956 #if !LELY_NO_THREADS
957  pthread_mutex_unlock(&impl->mtx);
958 #endif
959 
960  return io_can_fd_write_msg(fd, msg, timeout);
961 }
962 
963 static void
964 io_can_chan_impl_submit_write(
965  io_can_chan_t *chan, struct io_can_chan_write *write)
966 {
967  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
968  assert(write);
969  assert(write->msg);
970  struct ev_task *task = &write->task;
971 
972 #if !LELY_NO_CANFD
973  int flags = 0;
974  if (write->msg->flags & CAN_FLAG_FDF)
975  flags |= IO_CAN_BUS_FLAG_FDF;
976  if (write->msg->flags & CAN_FLAG_BRS)
977  flags |= IO_CAN_BUS_FLAG_BRS;
978 #endif
979 
980  if (!task->exec)
981  task->exec = impl->exec;
982  assert(task->exec);
983  ev_exec_on_task_init(task->exec);
984 
985 #if !LELY_NO_THREADS
986  pthread_mutex_lock(&impl->mtx);
987 #endif
988  if (impl->shutdown) {
989 #if !LELY_NO_THREADS
990  pthread_mutex_unlock(&impl->mtx);
991 #endif
992  io_can_chan_write_post(write, ECANCELED);
993 #if !LELY_NO_CANFD
994  } else if ((flags & impl->flags) != flags) {
995 #if !LELY_NO_THREADS
996  pthread_mutex_unlock(&impl->mtx);
997 #endif
998  io_can_chan_write_post(write, EINVAL);
999 #endif
1000  } else {
1001  int post_write = !impl->write_posted
1002  && sllist_empty(&impl->write_queue);
1003  sllist_push_back(&impl->write_queue, &task->_node);
1004  if (post_write)
1005  impl->write_posted = 1;
1006 #if !LELY_NO_THREADS
1007  pthread_mutex_unlock(&impl->mtx);
1008 #endif
1009  assert(impl->write_task.exec);
1010  if (post_write)
1011  ev_exec_post(impl->write_task.exec, &impl->write_task);
1012  }
1013 }
1014 
1015 static void
1016 io_can_chan_impl_svc_shutdown(struct io_svc *svc)
1017 {
1018  struct io_can_chan_impl *impl = io_can_chan_impl_from_svc(svc);
1019  io_dev_t *dev = &impl->dev_vptr;
1020 
1021 #if !LELY_NO_THREADS
1022  pthread_mutex_lock(&impl->mtx);
1023 #endif
1024  int shutdown = !impl->shutdown;
1025  impl->shutdown = 1;
1026  if (shutdown) {
1027  if (impl->events) {
1028  impl->events = 0;
1029  // Stop monitoring I/O events.
1030  io_poll_watch(impl->poll, impl->fd, impl->events,
1031  &impl->watch);
1032  }
1033  // Try to abort io_can_chan_impl_rxbuf_task_func(),
1034  // io_can_chan_impl_read_task_func() and
1035  // io_can_chan_impl_write_task_func().
1036  io_can_chan_impl_do_abort_tasks(impl);
1037  }
1038 #if !LELY_NO_THREADS
1039  pthread_mutex_unlock(&impl->mtx);
1040 #endif
1041  // cppcheck-suppress duplicateCondition
1042  if (shutdown)
1043  // Cancel all pending operations.
1044  io_can_chan_impl_dev_cancel(dev, NULL);
1045 }
1046 
1047 static void
1048 io_can_chan_impl_watch_func(struct io_poll_watch *watch, int events)
1049 {
1050  assert(watch);
1051  struct io_can_chan_impl *impl =
1053 
1054 #if !LELY_NO_THREADS
1055  pthread_mutex_lock(&impl->mtx);
1056 #endif
1057  // Continue monitoring events that are not otherwise handled.
1058  if ((events & IO_EVENT_ERR) || impl->fd == -1 || impl->shutdown) {
1059  impl->events = 0;
1060  } else if ((impl->events &= ~events) != 0) {
1061  int errsv = errno;
1062  // clang-format off
1063  if (io_poll_watch(impl->poll, impl->fd, impl->events,
1064  &impl->watch) == -1) {
1065  // clang-format on
1066  impl->events = 0;
1067  events |= IO_EVENT_ERR;
1068  }
1069  errno = errsv;
1070  }
1071 
1072  // Process incoming CAN frames.
1073  int post_rxbuf = 0;
1074  if ((events & (IO_EVENT_IN | IO_EVENT_ERR)) && !impl->shutdown) {
1075  post_rxbuf = !impl->rxbuf_posted;
1076  impl->rxbuf_posted = 1;
1077  }
1078 
1079  // Retry any pending write operations.
1080  int post_write = 0;
1081  if ((events & (IO_EVENT_OUT | IO_EVENT_ERR))
1082  && !sllist_empty(&impl->write_queue)
1083  && !impl->shutdown) {
1084  post_write = !impl->write_posted;
1085  impl->write_posted = 1;
1086  }
1087 #if !LELY_NO_THREADS
1088  pthread_mutex_unlock(&impl->mtx);
1089 #endif
1090 
1091  if (post_rxbuf)
1092  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1093  if (post_write)
1094  ev_exec_post(impl->write_task.exec, &impl->write_task);
1095 }
1096 
1097 static void
1098 io_can_chan_impl_rxbuf_task_func(struct ev_task *task)
1099 {
1100  assert(task);
1101  struct io_can_chan_impl *impl =
1102  structof(task, struct io_can_chan_impl, rxbuf_task);
1103 
1104  int errsv = errno;
1105 
1106  struct sllist queue;
1107  sllist_init(&queue);
1108 
1109  int result = 0;
1110  int errc = 0;
1111  int wouldblock = 0;
1112 
1113 #if !LELY_NO_THREADS
1114  pthread_mutex_lock(&impl->mtx);
1115 #endif
1116  // Only try to fill the receive queue if there are pending read
1117  // operations and there is an empty slot in the queue, or if there are
1118  // pending write confirmations.
1119  // clang-format off
1120  while ((!sllist_empty(&impl->read_queue)
1121  && spscring_p_capacity(&impl->rxring))
1122  || !sllist_empty(&impl->confirm_queue)) {
1123  // clang-format on
1124  int fd = impl->fd;
1125 #if !LELY_NO_THREADS
1126  pthread_mutex_unlock(&impl->mtx);
1127 #endif
1128 
1129  struct io_can_frame frame_;
1130  struct io_can_frame *frame = &frame_;
1131  // Try to obtain an empty slot in the receive queue.
1132  size_t n = 1;
1133  size_t i = spscring_p_alloc(&impl->rxring, &n);
1134  if (n)
1135  frame = &impl->rxbuf[i];
1136 
1137  // Try to read a CAN or CAN FD format frame from the CAN bus.
1138  int flags = 0;
1139  result = io_can_fd_read(fd, &frame->frame, &frame->nbytes,
1140  &flags, &frame->ts,
1141  impl->poll ? 0 : LELY_IO_RX_TIMEOUT);
1142  errc = !result ? 0 : errno;
1143  wouldblock = errc == EAGAIN || errc == EWOULDBLOCK;
1144 
1145  // Convert the frame from the SocketCAN format if it is a write
1146  // confirmation.
1147  struct can_msg msg;
1148  if (!result && (flags & MSG_CONFIRM)) {
1149  void *src = &frame->frame;
1150 #if !LELY_NO_CANFD
1151  if (frame->nbytes == CANFD_MTU)
1152  canfd_frame2can_msg(src, &msg);
1153  else
1154 #endif
1155  can_frame2can_msg(src, &msg);
1156  }
1157 
1158  // Make the frame available for reading.
1159  if (!result && !(flags & MSG_CONFIRM))
1160  spscring_p_commit(&impl->rxring, n);
1161 
1162 #if !LELY_NO_THREADS
1163  pthread_mutex_lock(&impl->mtx);
1164 #endif
1165  // Process the write confirmation, if any.
1166  if (!result && flags & MSG_CONFIRM)
1167  io_can_chan_impl_do_confirm(impl, &queue, &msg);
1168 
1169  // Stop if the operation did or would block, or if an error
1170  // occurred.
1171  if (!impl->poll || result < 0)
1172  break;
1173  }
1174  // Cancel all pending read operations on error.
1175  if (result < 0 && !wouldblock) {
1176  io_can_chan_impl_do_read(impl, &queue, NULL);
1177  while ((task = ev_task_from_node(
1178  sllist_pop_front(&impl->read_queue)))) {
1179  struct io_can_chan_read *read =
1181  read->r.result = result;
1182  read->r.errc = errc;
1183  sllist_push_back(&queue, &task->_node);
1184  }
1185  }
1186  // clang-format off
1187  int post_rxbuf = !(sllist_empty(&impl->read_queue)
1188  && sllist_empty(&impl->confirm_queue))
1189  && impl->fd != -1 && !impl->shutdown;
1190  // clang-format on
1191  // If a read operation would block, start monitoring the file descriptor
1192  // for I/O events.
1193  if (post_rxbuf && impl->poll && wouldblock) {
1194  int events = impl->events | IO_EVENT_IN;
1195  // clang-format off
1196  if (!io_poll_watch(impl->poll, impl->fd, events,
1197  &impl->watch)) {
1198  // clang-format on
1199  impl->events = events;
1200  // Do not repost this thask unless registering the file
1201  // descriptor fails.
1202  post_rxbuf = 0;
1203  }
1204  }
1205  impl->rxbuf_posted = post_rxbuf;
1206 #if !LELY_NO_THREADS
1207  pthread_mutex_unlock(&impl->mtx);
1208 #endif
1209 
1210  ev_task_queue_post(&queue);
1211 
1212  if (post_rxbuf)
1213  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1214 
1215  errno = errsv;
1216 }
1217 
1218 static void
1219 io_can_chan_impl_read_task_func(struct ev_task *task)
1220 {
1221  assert(task);
1222  struct io_can_chan_impl *impl =
1223  structof(task, struct io_can_chan_impl, read_task);
1224 
1225  struct sllist queue;
1226  sllist_init(&queue);
1227 
1228 #if !LELY_NO_THREADS
1229  pthread_mutex_lock(&impl->mtx);
1230 #endif
1231  // Process any pending read operations that can be satisfied.
1232  int wouldblock = 0;
1233  io_can_chan_impl_do_read(impl, &queue, &wouldblock);
1234  int post_rxbuf = 0;
1235  // Repost this task if any read operations remain in the queue.
1236  int post_read = !sllist_empty(&impl->read_queue) && !impl->shutdown;
1237  // Register a wait operation if the receive queue is empty.
1238  if (post_read && wouldblock) {
1239 #if !LELY_NO_THREADS
1240  pthread_mutex_lock(&impl->c_mtx);
1241 #endif
1242  // Do not repost this task unless the wait condition can be
1243  // satisfied immediately.
1244  post_read = !spscring_c_submit_wait(&impl->rxring, 1,
1245  io_can_chan_impl_c_signal, impl);
1246 #if !LELY_NO_THREADS
1247  pthread_mutex_unlock(&impl->c_mtx);
1248 #endif
1249  if (!post_read)
1250  // If the receive queue is empty, start reading more CAN
1251  // frames, unless we're already waiting for one.
1252  post_rxbuf = !impl->rxbuf_posted
1253  && !(impl->events & IO_EVENT_IN)
1254  && impl->fd != -1 && !impl->shutdown;
1255  }
1256  // cppcheck-suppress knownConditionTrueFalse
1257  if (post_rxbuf)
1258  impl->rxbuf_posted = 1;
1259  impl->read_posted = post_read;
1260 #if !LELY_NO_THREADS
1261  pthread_mutex_unlock(&impl->mtx);
1262 #endif
1263 
1264  ev_task_queue_post(&queue);
1265 
1266  // cppcheck-suppress knownConditionTrueFalse
1267  if (post_rxbuf)
1268  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1269 
1270  if (post_read)
1271  ev_exec_post(impl->read_task.exec, &impl->read_task);
1272 }
1273 
1274 static void
1275 io_can_chan_impl_write_task_func(struct ev_task *task)
1276 {
1277  assert(task);
1278  struct io_can_chan_impl *impl =
1279  structof(task, struct io_can_chan_impl, write_task);
1280 
1281  int errsv = errno;
1282 
1283  int wouldblock = 0;
1284 
1285 #if !LELY_NO_THREADS
1286  pthread_mutex_lock(&impl->mtx);
1287 #endif
1288  // Try to process all pending write operations at once, unless we're in
1289  // blocking mode.
1290  while ((task = impl->current_write = ev_task_from_node(
1291  sllist_pop_front(&impl->write_queue)))) {
1292  int fd = impl->fd;
1293 #if !LELY_NO_THREADS
1294  pthread_mutex_unlock(&impl->mtx);
1295 #endif
1296  struct io_can_chan_write *write =
1298  int result = io_can_fd_write_msg(fd, write->msg,
1299  impl->poll ? 0 : LELY_IO_TX_TIMEOUT);
1300  int errc = !result ? 0 : errno;
1301  wouldblock = errc == EAGAIN || errc == EWOULDBLOCK;
1302  if (!wouldblock && errc)
1303  // The operation failed immediately.
1304  io_can_chan_write_post(write, errc);
1305 #if !LELY_NO_THREADS
1306  pthread_mutex_lock(&impl->mtx);
1307 #endif
1308  if (!errc)
1309  // Wait for the write confirmation.
1310  sllist_push_back(&impl->confirm_queue, &task->_node);
1311  if (task == impl->current_write) {
1312  // Put the write operation back on the queue if it would
1313  // block, unless it was canceled.
1314  if (wouldblock) {
1316  &task->_node);
1317  task = NULL;
1318  }
1319  impl->current_write = NULL;
1320  }
1321  assert(!impl->current_write);
1322  // Stop if the operation did or would block.
1323  if (!impl->poll || wouldblock)
1324  break;
1325  }
1326  // If we're waiting for a write confirmation, start reading more CAN
1327  // frames, unless we're already waiting for one.
1328  int post_rxbuf = !impl->rxbuf_posted
1329  && !sllist_empty(&impl->confirm_queue)
1330  && !(impl->events & IO_EVENT_IN) && impl->fd != -1
1331  && !impl->shutdown;
1332  if (post_rxbuf)
1333  impl->rxbuf_posted = 1;
1334  // Repost this task if any write operations remain in the queue.
1335  int post_write = !sllist_empty(&impl->write_queue) && impl->fd != -1
1336  && !impl->shutdown;
1337  // If a write operation would block, start monitoring the file
1338  // descriptor for I/O events.
1339  if (post_write && impl->poll && wouldblock) {
1340  int events = impl->events | IO_EVENT_OUT;
1341  // clang-format off
1342  if (!io_poll_watch(impl->poll, impl->fd, events,
1343  &impl->watch)) {
1344  // clang-format on
1345  impl->events = events;
1346  // Do not repost this task unless registering the file
1347  // descriptor fails.
1348  post_write = 0;
1349  }
1350  }
1351  impl->write_posted = post_write;
1352 #if !LELY_NO_THREADS
1353  pthread_mutex_unlock(&impl->mtx);
1354 #endif
1355 
1356  if (task && wouldblock)
1357  // The operation would block but was canceled before it could be
1358  // requeued.
1359  io_can_chan_write_post(
1360  io_can_chan_write_from_task(task), ECANCELED);
1361 
1362  if (post_rxbuf)
1363  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1364 
1365  if (post_write)
1366  ev_exec_post(impl->write_task.exec, &impl->write_task);
1367 
1368  errno = errsv;
1369 }
1370 
1371 static inline struct io_can_chan_impl *
1372 io_can_chan_impl_from_dev(const io_dev_t *dev)
1373 {
1374  assert(dev);
1375 
1376  return structof(dev, struct io_can_chan_impl, dev_vptr);
1377 }
1378 
1379 static inline struct io_can_chan_impl *
1380 io_can_chan_impl_from_chan(const io_can_chan_t *chan)
1381 {
1382  assert(chan);
1383 
1384  return structof(chan, struct io_can_chan_impl, chan_vptr);
1385 }
1386 
1387 static inline struct io_can_chan_impl *
1388 io_can_chan_impl_from_svc(const struct io_svc *svc)
1389 {
1390  assert(svc);
1391 
1392  return structof(svc, struct io_can_chan_impl, svc);
1393 }
1394 
1395 static void
1396 io_can_chan_impl_c_signal(struct spscring *ring, void *arg)
1397 {
1398  (void)ring;
1399  struct io_can_chan_impl *impl = arg;
1400  assert(impl);
1401 
1402 #if !LELY_NO_THREADS
1403  pthread_mutex_lock(&impl->mtx);
1404 #endif
1405  int post_read = !impl->read_posted && !sllist_empty(&impl->read_queue)
1406  && !impl->shutdown;
1407  if (post_read)
1408  impl->read_posted = 1;
1409 #if !LELY_NO_THREADS
1410  pthread_mutex_unlock(&impl->mtx);
1411 #endif
1412  // cppcheck-suppress duplicateCondition
1413  if (post_read)
1414  ev_exec_post(impl->read_task.exec, &impl->read_task);
1415 }
1416 
1417 static void
1418 io_can_chan_impl_do_pop(struct io_can_chan_impl *impl,
1419  struct sllist *read_queue, struct sllist *write_queue,
1420  struct sllist *confirm_queue, struct ev_task *task)
1421 {
1422  assert(impl);
1423  assert(read_queue);
1424  assert(write_queue);
1425  assert(confirm_queue);
1426 
1427  if (!task) {
1431  } else if (sllist_remove(&impl->read_queue, &task->_node)) {
1432  sllist_push_back(read_queue, &task->_node);
1433  } else if (sllist_remove(&impl->write_queue, &task->_node)) {
1434  sllist_push_back(write_queue, &task->_node);
1435  } else if (sllist_remove(&impl->confirm_queue, &task->_node)) {
1436  sllist_push_back(confirm_queue, &task->_node);
1437  }
1438 }
1439 
1440 static void
1441 io_can_chan_impl_do_read(struct io_can_chan_impl *impl, struct sllist *queue,
1442  int *pwouldblock)
1443 {
1444  assert(impl);
1445  assert(queue);
1446 
1447  int errsv = errno;
1448 
1449  int wouldblock = 0;
1450 
1451  struct slnode *node;
1452  while ((node = sllist_first(&impl->read_queue))) {
1453  struct ev_task *task = ev_task_from_node(node);
1454  struct io_can_chan_read *read =
1456 
1457 #if !LELY_NO_THREADS
1458  pthread_mutex_lock(&impl->c_mtx);
1459 #endif
1460 
1461  // Check if a frame is available in the receive queue.
1462  size_t n = 1;
1463  size_t i = spscring_c_alloc(&impl->rxring, &n);
1464  if (!n) {
1465 #if !LELY_NO_THREADS
1466  pthread_mutex_unlock(&impl->c_mtx);
1467 #endif
1468  wouldblock = 1;
1469  break;
1470  }
1471 
1472  // Copy the frame from the buffer.
1473  struct io_can_frame *frame = &impl->rxbuf[i];
1474  void *data = &frame->frame;
1475  int is_err = can_frame2can_err(data, read->err);
1476  if (!is_err && read->msg) {
1477 #if !LELY_NO_CANFD
1478  if (frame->nbytes == CANFD_MTU)
1479  canfd_frame2can_msg(data, read->msg);
1480  else
1481 #endif
1482  can_frame2can_msg(data, read->msg);
1483  }
1484  if (read->tp)
1485  *read->tp = frame->ts;
1486  spscring_c_commit(&impl->rxring, 1);
1487 
1488 #if !LELY_NO_THREADS
1489  pthread_mutex_unlock(&impl->c_mtx);
1490 #endif
1491 
1492  read->r.result = !is_err;
1493  read->r.errc = 0;
1494 
1495  sllist_pop_front(&impl->read_queue);
1496  sllist_push_back(queue, node);
1497  }
1498 
1499  if (pwouldblock)
1500  *pwouldblock = wouldblock;
1501 
1502  errno = errsv;
1503 }
1504 
1505 static void
1506 io_can_chan_impl_do_confirm(struct io_can_chan_impl *impl, struct sllist *queue,
1507  const struct can_msg *msg)
1508 {
1509  assert(impl);
1510  assert(queue);
1511  assert(msg);
1512 
1513  // Find the matching write operation.
1514  struct slnode *node = sllist_first(&impl->confirm_queue);
1515  while (node) {
1517  ev_task_from_node(node));
1518  if (!can_msg_cmp(msg, write->msg))
1519  break;
1520  node = node->next;
1521  }
1522  if (!node)
1523  return;
1524 
1525  // Complete the matching write operation. Any preceding write operations
1526  // waiting for confirmation are considered to have failed.
1527  struct ev_task *task;
1528  while ((task = ev_task_from_node(
1529  sllist_pop_front(&impl->confirm_queue)))) {
1530  sllist_push_front(queue, &task->_node);
1531  struct io_can_chan_write *write =
1533  if (&task->_node == node) {
1534  write->errc = 0;
1535  break;
1536  } else {
1537  write->errc = EIO;
1538  }
1539  }
1540 }
1541 
1542 static size_t
1543 io_can_chan_impl_do_abort_tasks(struct io_can_chan_impl *impl)
1544 {
1545  assert(impl);
1546 
1547  size_t n = 0;
1548 
1549  // Try to abort io_can_chan_impl_rxbuf_task_func().
1550  // clang-format off
1551  if (impl->rxbuf_posted && ev_exec_abort(impl->rxbuf_task.exec,
1552  &impl->rxbuf_task)) {
1553  // clang-format on
1554  impl->rxbuf_posted = 0;
1555  n++;
1556  }
1557 
1558  // Try to abort io_can_chan_impl_read_task_func().
1559  // clang-format off
1560  if (impl->read_posted && ev_exec_abort(impl->read_task.exec,
1561  &impl->read_task)) {
1562  // clang-format on
1563  impl->read_posted = 0;
1564  n++;
1565  }
1566 
1567  // Try to abort io_can_chan_impl_write_task_func().
1568  // clang-format off
1569  if (impl->write_posted && ev_exec_abort(impl->write_task.exec,
1570  &impl->write_task)) {
1571  // clang-format on
1572  impl->write_posted = 0;
1573  n++;
1574  }
1575 
1576  return n;
1577 }
1578 
1579 static int
1580 io_can_chan_impl_set_fd(struct io_can_chan_impl *impl, int fd, int flags)
1581 {
1582  assert(impl);
1583  assert(!(flags & ~IO_CAN_BUS_FLAG_MASK));
1584 
1585  struct sllist read_queue, write_queue, confirm_queue;
1586  sllist_init(&read_queue);
1587  sllist_init(&write_queue);
1588  sllist_init(&confirm_queue);
1589 
1590 #if !LELY_NO_THREADS
1591  pthread_mutex_lock(&impl->mtx);
1592 #endif
1593 
1594  if (impl->events) {
1595  impl->events = 0;
1596  // Stop monitoring I/O events.
1597  io_poll_watch(impl->poll, impl->fd, impl->events, &impl->watch);
1598  }
1599 
1600 #if !LELY_NO_THREADS
1601  pthread_mutex_lock(&impl->c_mtx);
1602 #endif
1603  spscring_c_abort_wait(&impl->rxring);
1604  // Clear the receive queue.
1605  size_t n = SIZE_MAX;
1606  spscring_c_alloc(&impl->rxring, &n);
1607  if (n)
1608  spscring_c_commit(&impl->rxring, n);
1609 #if !LELY_NO_THREADS
1610  pthread_mutex_unlock(&impl->c_mtx);
1611 #endif
1612 
1613  int tmp = impl->fd;
1614  impl->fd = fd;
1615  fd = tmp;
1616 
1617  impl->flags = flags;
1618 
1619  // Cancel pending operations.
1620  sllist_append(&read_queue, &impl->read_queue);
1621  sllist_append(&write_queue, &impl->write_queue);
1622  sllist_append(&confirm_queue, &impl->confirm_queue);
1623 
1624  // Mark the ongoing write operation as canceled, if necessary.
1625  impl->current_write = NULL;
1626 
1627 #if !LELY_NO_THREADS
1628  pthread_mutex_unlock(&impl->mtx);
1629 #endif
1630 
1631  io_can_chan_read_queue_post(&read_queue, -1, ECANCELED);
1632  io_can_chan_write_queue_post(&write_queue, ECANCELED);
1633  io_can_chan_write_queue_post(&confirm_queue, ECANCELED);
1634 
1635  return fd;
1636 }
1637 
1638 #endif // !LELY_NO_STDIO && __linux__
LELY_IO_RX_TIMEOUT
#define LELY_IO_RX_TIMEOUT
The default timeout (in milliseconds) for I/O read operations.
Definition: io2.h:31
ctx.h
io_can_chan_impl::confirm_queue
struct sllist confirm_queue
The queue containing write operations waiting to be confirmed.
Definition: can_chan.c:192
can_msg::flags
uint_least8_t flags
The flags (any combination of CAN_FLAG_IDE, CAN_FLAG_RTR, CAN_FLAG_FDF, CAN_FLAG_BRS and CAN_FLAG_ESI...
Definition: msg.h:94
io_can_chan_impl::events
int events
The I/O events currently being monitored by poll for fd.
Definition: can_chan.c:178
ev_task_from_node
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
io_can_chan_get_handle
int io_can_chan_get_handle(const io_can_chan_t *chan)
Returns the SocketCAN file descriptor associated with a CAN channel, or -1 if the channel is closed.
Definition: can_chan.c:418
io_can_chan_impl::exec
ev_exec_t * exec
A pointer to the executor used to execute all I/O tasks.
Definition: can_chan.c:149
ev_exec_t
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
IO_POLL_WATCH_INIT
#define IO_POLL_WATCH_INIT(func)
The static initializer for io_poll_watch.
Definition: poll.h:65
io_can_ctrl_get_index
unsigned int io_can_ctrl_get_index(const io_can_ctrl_t *ctrl)
Returns the interface index of a CAN controller.
Definition: can_ctrl.c:191
spscring.h
sllist_remove
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
io_can_chan_create
io_can_chan_t * io_can_chan_create(io_poll_t *poll, ev_exec_t *exec, size_t rxlen)
Creates a new CAN channel.
Definition: can_chan.c:382
CAN_FLAG_BRS
@ CAN_FLAG_BRS
The Bit Rate Switch (BRS) flag (only available in CAN FD format frames).
Definition: msg.h:62
sllist_first
struct slnode * sllist_first(const struct sllist *list)
Returns a pointer to the first node in a singly-linked list.
Definition: sllist.h:271
io_can_chan_read_from_task
struct io_can_chan_read * io_can_chan_read_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel read operation from a pointer to its completion task.
Definition: can.c:93
can_msg_cmp
int can_msg_cmp(const void *p1, const void *p2)
Compares two CAN or CAN FD format frames.
Definition: msg.c:30
spscring_c_commit
size_t spscring_c_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a producer and, if this satisfies a wait operation...
Definition: spscring.c:302
io_can_chan_impl::poll
io_poll_t * poll
A pointer to the polling instance used to watch for I/O events.
Definition: can_chan.c:143
IO_CAN_BUS_FLAG_ERR
@ IO_CAN_BUS_FLAG_ERR
Reception of error frames is enabled.
Definition: can.h:39
slnode::next
struct slnode * next
A pointer to the next node in the list.
Definition: sllist.h:42
io_svc_vtbl
The virtual table of an I/O service.
Definition: ctx.h:67
io_can_chan_release
int io_can_chan_release(io_can_chan_t *chan)
Dissociates and returns the SocketCAN file descriptor from a CAN channel.
Definition: can_chan.c:561
can_msg2canfd_frame
int can_msg2canfd_frame(const struct can_msg *src, struct canfd_frame *dst)
Converts a can_msg frame to a SocketCAN CAN FD frame.
Definition: can_msg.h:141
io_can_chan_impl::read_posted
unsigned read_posted
A flag indicating whether read_task has been posted to exec.
Definition: can_chan.c:184
io_can_chan_is_open
int io_can_chan_is_open(const io_can_chan_t *chan)
Returns 1 if the CAN channel is open and 0 if not.
Definition: can_chan.c:569
can_msg
A CAN or CAN FD format frame.
Definition: msg.h:87
IO_SVC_INIT
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
io_fd_recvmsg
ssize_t io_fd_recvmsg(int fd, struct msghdr *msg, int flags, int timeout)
Equivalent to POSIX recvmsg(fd, msg, flags), except that if fd is non-blocking (or the implementation...
Definition: fd.c:80
io_dev_vtbl
Definition: dev.h:41
io_can_chan_write::task
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the write operation.
Definition: can.h:121
util.h
diag.h
io_poll_watch
An object representing a file descriptor being monitored for I/O events.
Definition: poll.h:56
IO_EVENT_IN
@ IO_EVENT_IN
Data (other than priority data) MAY be read without blocking.
Definition: event.h:35
io_can_chan_vtbl
Definition: can.h:149
io_can_chan_write::errc
int errc
The error number, obtained as if by get_errc(), if an error occurred or the operation was canceled.
Definition: can.h:126
sllist_push_front
void sllist_push_front(struct sllist *list, struct slnode *node)
Pushes a node to the front of a singly-linked list.
Definition: sllist.h:221
spscring_c_abort_wait
int spscring_c_abort_wait(struct spscring *ring)
Aborts a wait operation previously registered with spscring_c_submit_wait().
Definition: spscring.c:368
io_ctx_insert
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:126
io_can_chan_impl::fd
int fd
The SocketCAN file descriptor.
Definition: can_chan.c:174
io_can_chan_impl::shutdown
unsigned shutdown
A flag indicating whether the I/O service has been shut down.
Definition: can_chan.c:180
spscring_c_alloc
size_t spscring_c_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:262
poll.h
spscring_c_submit_wait
int spscring_c_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:327
io_can_chan_destroy
void io_can_chan_destroy(io_can_chan_t *chan)
Destroys a CAN channel.
Definition: can_chan.c:409
io_can_chan_close
int io_can_chan_close(io_can_chan_t *chan)
Closes the SocketCAN file descriptor associated with a CAN channel.
Definition: can_chan.c:575
can_err
A CAN error frame.
Definition: err.h:28
io_fd_sendmsg
ssize_t io_fd_sendmsg(int fd, const struct msghdr *msg, int flags, int timeout)
Equivalent to POSIX sendmsg(fd, msg, flags | MSG_NOSIGNAL), except that if fd is non-blocking (or the...
Definition: fd.c:116
spscring_p_alloc
size_t spscring_p_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:98
io_can_chan_read
A CAN channel read operation.
Definition: can.h:74
sllist_append
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:257
io_can_chan_read_result::errc
int errc
The error number, obtained as if by get_errc(), if result is -1.
Definition: can.h:70
io_can_chan_impl::write_posted
unsigned write_posted
A flag indicating whether write_task has been posted to exec.
Definition: can_chan.c:186
io_can_chan_impl::mtx
pthread_mutex_t mtx
The mutex protecting the file descriptor, the flags and the queues of pending operations.
Definition: can_chan.c:171
io_can_chan_impl::rxbuf
struct io_can_frame * rxbuf
The receive queue.
Definition: can_chan.c:165
ev_task_queue_post
size_t ev_task_queue_post(struct sllist *queue)
Post the tasks in queue to their respective executors and invokes ev_exec_on_task_fini() for each of ...
Definition: task.c:38
io_can_chan_read::msg
struct can_msg * msg
The address at which to store the CAN frame.
Definition: can.h:80
spscring_init
void spscring_init(struct spscring *ring, size_t size)
Initializes a single-producer, single-consumer ring buffer with the specified size.
Definition: spscring.c:47
IO_CAN_BUS_FLAG_FDF
@ IO_CAN_BUS_FLAG_FDF
FD Format (formerly Extended Data Length) support is enabled.
Definition: can.h:42
io.h
io_can_chan_impl::flags
int flags
The flags with which fd has been opened.
Definition: can_chan.c:176
io_can_chan_write
A CAN channel write operation.
Definition: can.h:110
io_can_chan_t
const struct io_can_chan_vtbl *const io_can_chan_t
An abstract CAN channel.
Definition: can.h:59
io_can_chan_open
int io_can_chan_open(io_can_chan_t *chan, const io_can_ctrl_t *ctrl, int flags)
Opens a CAN channel.
Definition: can_chan.c:433
io_can_chan_impl::rxbuf_task
struct ev_task rxbuf_task
The task responsible for filling the receive queue.
Definition: can_chan.c:153
io_can_chan_read_result::result
int result
The result of the read operation: 1 if a CAN frame is received, 0 if an error frame is received,...
Definition: can.h:68
io_can_chan_write::msg
const struct can_msg * msg
A pointer to the CAN frame to be written.
Definition: can.h:116
io_can_chan_impl::current_write
struct ev_task * current_write
The write operation currently being executed.
Definition: can_chan.c:194
io_can_attr
Definition: can_attr.h:50
sllist_empty
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:202
io_can_chan_impl::c_mtx
pthread_mutex_t c_mtx
The mutex protecting the receive queue consumer.
Definition: can_chan.c:160
DIAG_WARNING
@ DIAG_WARNING
A warning.
Definition: diag.h:55
__io_poll
An I/O polling interface.
Definition: poll.c:51
io_poll_watch
int io_poll_watch(io_poll_t *poll, io_handle_t handle, struct io_event *event, int keep)
Registers an I/O device with an I/O polling interface and instructs it to watch for certain events.
Definition: poll.c:252
can_err.h
canfd_frame2can_msg
int canfd_frame2can_msg(const struct canfd_frame *src, struct can_msg *dst)
Converts a SocketCAN CAN FD frame to a can_msg frame.
Definition: can_msg.h:112
io_can_chan_impl::read_queue
struct sllist read_queue
The queue containing pending read operations.
Definition: can_chan.c:188
ev_task_queue_abort
size_t ev_task_queue_abort(struct sllist *queue)
Aborts the tasks in queue by invoking ev_exec_on_task_fini() for each of them.
Definition: task.c:55
io_can_chan_impl::ctx
io_ctx_t * ctx
A pointer to the I/O context with which the channel is registered.
Definition: can_chan.c:147
io_poll_get_ctx
io_ctx_t * io_poll_get_ctx(const io_poll_t *poll)
Returns a pointer to the I/O context with which the I/O polling instance is registered.
Definition: poll.c:275
can_msg2can_frame
int can_msg2can_frame(const struct can_msg *src, struct can_frame *dst)
Converts a can_msg frame to a SocketCAN CAN frame.
Definition: can_msg.h:80
io_svc
An I/O service.
Definition: ctx.h:49
sllist_init
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:194
io_can_chan_impl
The implementation of a CAN channel.
Definition: can_chan.c:133
io_can_chan_impl::rxring
struct spscring rxring
The ring buffer used to control the receive queue.
Definition: can_chan.c:163
IO_EVENT_OUT
@ IO_EVENT_OUT
Data (bot normal and priority data) MAY be written without blocking.
Definition: event.h:46
sllist_pop_front
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:243
ev_exec_abort
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:136
can_attr.h
io_can_chan_impl::read_task
struct ev_task read_task
The task responsible for initiating read operations.
Definition: can_chan.c:155
io_ctx_remove
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:141
io_can_frame
Definition: can_chan.c:60
time.h
io_can_chan_read::tp
struct timespec * tp
The address at which to store the system time at which the CAN frame or CAN error frame was received.
Definition: can.h:93
IO_EVENT_ERR
@ IO_EVENT_ERR
An error has occurred. This event is always reported.
Definition: event.h:48
io_can_chan_impl::watch
struct io_poll_watch watch
The object used to monitor the file descriptor for I/O events.
Definition: can_chan.c:151
diag
void diag(enum diag_severity severity, int errc, const char *format,...)
Emits a diagnostic message.
Definition: diag.c:171
sllist
A singly-linked list.
Definition: sllist.h:52
can_msg.h
unistd.h
io_can_chan_impl::chan_vptr
const struct io_can_chan_vtbl * chan_vptr
A pointer to the virtual table for the CAN channel interface.
Definition: can_chan.c:137
IO_CAN_BUS_FLAG_BRS
@ IO_CAN_BUS_FLAG_BRS
Bit Rate Switch support is enabled.
Definition: can.h:44
io_can_chan_read::err
struct can_err * err
The address at which to store the CAN error frame.
Definition: can.h:86
LELY_IO_CAN_RXLEN
#define LELY_IO_CAN_RXLEN
The default SocketCAN receive queue length (in number of CAN frames).
Definition: can_chan.c:57
io_can_chan_assign
int io_can_chan_assign(io_can_chan_t *chan, int fd)
Assigns an existing SocketCAN file descriptor to a CAN channel.
Definition: can_chan.c:503
io_can_chan_write_from_task
struct io_can_chan_write * io_can_chan_write_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel write operation from a pointer to its completion task.
Definition: can.c:99
structof
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
ev_task
An executable task.
Definition: task.h:41
io_can_chan_impl::svc
struct io_svc svc
The I/O service representing the channel.
Definition: can_chan.c:145
io_dev_t
const struct io_dev_vtbl *const io_dev_t
An abstract I/O device.
Definition: dev.h:35
CAN_FLAG_FDF
@ CAN_FLAG_FDF
The FD Format (FDF) flag, formerly known as Extended Data Length (EDL).
Definition: msg.h:54
EV_TASK_INIT
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
spscring_p_capacity
size_t spscring_p_capacity(struct spscring *ring)
Returns the total capacity available for a producer in a single-producer single-consumer ring buffer,...
Definition: spscring.c:65
ev_exec_on_task_init
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:106
io_ctx
Definition: ctx.c:38
spscring
A single-producer, single-consumer ring buffer.
Definition: spscring.h:63
ev_exec_post
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:124
can.h
io_can_chan_read::task
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition: can.h:98
stdlib.h
spscring_p_commit
size_t spscring_p_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a consumer and, if this satisfies a wait operation...
Definition: spscring.c:138
slnode
A node in a singly-linked list.
Definition: sllist.h:40
io_can_chan_impl::dev_vptr
const struct io_dev_vtbl * dev_vptr
A pointer to the virtual table for the I/O device interface.
Definition: can_chan.c:135
can_frame2can_msg
int can_frame2can_msg(const struct can_frame *src, struct can_msg *dst)
Converts a SocketCAN CAN frame to a can_msg frame.
Definition: can_msg.h:51
sllist_push_back
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:232
io_can_ctrl_get_flags
int io_can_ctrl_get_flags(const io_can_ctrl_t *ctrl)
Returns the flags specifying which CAN bus features are enabled.
Definition: can_ctrl.c:199
LELY_IO_TX_TIMEOUT
#define LELY_IO_TX_TIMEOUT
The default timeout (in milliseconds) for I/O write operations.
Definition: io2.h:36
io_can_chan_read::r
struct io_can_chan_read_result r
The result of the read operation.
Definition: can.h:100
ev_task::exec
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
io_can_chan_impl::rxbuf_posted
unsigned rxbuf_posted
A flag indicating whether rxbuf_task has been posted to exec.
Definition: can_chan.c:182
io_can_chan_impl::write_task
struct ev_task write_task
The task responsible for initiating write operations.
Definition: can_chan.c:157
io_can_ctrl_t
const struct io_can_ctrl_vtbl *const io_can_ctrl_t
An abstract CAN controller.
Definition: can.h:56
io_can_chan_impl::write_queue
struct sllist write_queue
The queue containing pending write operations.
Definition: can_chan.c:190