Lely core libraries  2.3.4
can_chan.c
Go to the documentation of this file.
1 
24 #include "io.h"
25 
26 #if !LELY_NO_STDIO && defined(__linux__)
27 
28 #include "../can.h"
29 #include <lely/io2/ctx.h>
30 #include <lely/io2/linux/can.h>
31 #include <lely/io2/posix/poll.h>
32 #include <lely/util/diag.h>
33 #include <lely/util/spscring.h>
34 #include <lely/util/time.h>
35 #include <lely/util/util.h>
36 
37 #include <assert.h>
38 #include <errno.h>
39 #include <stdlib.h>
40 
41 #if !LELY_NO_THREADS
42 #include <pthread.h>
43 #endif
44 #include <unistd.h>
45 
46 #include <linux/can/raw.h>
47 #include <linux/sockios.h>
48 #include <sys/ioctl.h>
49 
50 #include "../posix/fd.h"
51 #include "can_attr.h"
52 #include "can_err.h"
53 #include "can_msg.h"
54 
55 #ifndef LELY_IO_CAN_RXLEN
57 #define LELY_IO_CAN_RXLEN 1024
58 #endif
59 
60 struct io_can_frame {
61 #if LELY_NO_CANFD
62  struct can_frame frame;
63 #else
64  struct canfd_frame frame;
65 #endif
66  size_t nbytes;
67  struct timespec ts;
68 };
69 
70 static int io_can_fd_set_default(int fd);
71 #if LELY_NO_CANFD
72 static int io_can_fd_read(int fd, struct can_frame *frame, size_t *pnbytes,
73  int *pflags, struct timespec *tp, int timeout);
74 #else
75 static int io_can_fd_read(int fd, struct canfd_frame *frame, size_t *pnbytes,
76  int *pflags, struct timespec *tp, int timeout);
77 #endif
78 #if LELY_NO_CANFD
79 static int io_can_fd_write(int fd, const struct can_frame *frame, size_t nbytes,
80  int dontwait);
81 #else
82 static int io_can_fd_write(int fd, const struct canfd_frame *frame,
83  size_t nbytes, int timeout);
84 #endif
85 static int io_can_fd_write_msg(int fd, const struct can_msg *msg, int timeout);
86 
87 static io_ctx_t *io_can_chan_impl_dev_get_ctx(const io_dev_t *dev);
88 static ev_exec_t *io_can_chan_impl_dev_get_exec(const io_dev_t *dev);
89 static size_t io_can_chan_impl_dev_cancel(io_dev_t *dev, struct ev_task *task);
90 static size_t io_can_chan_impl_dev_abort(io_dev_t *dev, struct ev_task *task);
91 
92 // clang-format off
93 static const struct io_dev_vtbl io_can_chan_impl_dev_vtbl = {
94  &io_can_chan_impl_dev_get_ctx,
95  &io_can_chan_impl_dev_get_exec,
96  &io_can_chan_impl_dev_cancel,
97  &io_can_chan_impl_dev_abort
98 };
99 // clang-format on
100 
101 static io_dev_t *io_can_chan_impl_get_dev(const io_can_chan_t *chan);
102 static int io_can_chan_impl_get_flags(const io_can_chan_t *chan);
103 static int io_can_chan_impl_read(io_can_chan_t *chan, struct can_msg *msg,
104  struct can_err *err, struct timespec *tp, int timeout);
105 static void io_can_chan_impl_submit_read(
106  io_can_chan_t *chan, struct io_can_chan_read *read);
107 static int io_can_chan_impl_write(
108  io_can_chan_t *chan, const struct can_msg *msg, int timeout);
109 static void io_can_chan_impl_submit_write(
110  io_can_chan_t *chan, struct io_can_chan_write *write);
111 
112 // clang-format off
113 static const struct io_can_chan_vtbl io_can_chan_impl_vtbl = {
114  &io_can_chan_impl_get_dev,
115  &io_can_chan_impl_get_flags,
116  &io_can_chan_impl_read,
117  &io_can_chan_impl_submit_read,
118  &io_can_chan_impl_write,
119  &io_can_chan_impl_submit_write
120 };
121 // clang-format on
122 
123 static void io_can_chan_impl_svc_shutdown(struct io_svc *svc);
124 
125 // clang-format off
126 static const struct io_svc_vtbl io_can_chan_impl_svc_vtbl = {
127  NULL,
128  &io_can_chan_impl_svc_shutdown
129 };
130 // clang-format on
131 
135  const struct io_dev_vtbl *dev_vptr;
145  struct io_svc svc;
151  struct io_poll_watch watch;
153  struct ev_task rxbuf_task;
155  struct ev_task read_task;
157  struct ev_task write_task;
158 #if !LELY_NO_THREADS
160  pthread_mutex_t c_mtx;
161 #endif
163  struct spscring rxring;
166 #if !LELY_NO_THREADS
171  pthread_mutex_t mtx;
172 #endif
174  int fd;
176  int flags;
178  int events;
180  unsigned shutdown : 1;
182  unsigned rxbuf_posted : 1;
184  unsigned read_posted : 1;
186  unsigned write_posted : 1;
188  struct sllist read_queue;
190  struct sllist write_queue;
192  struct sllist confirm_queue;
195 };
196 
197 static void io_can_chan_impl_watch_func(
198  struct io_poll_watch *watch, int events);
199 static void io_can_chan_impl_rxbuf_task_func(struct ev_task *task);
200 static void io_can_chan_impl_read_task_func(struct ev_task *task);
201 static void io_can_chan_impl_write_task_func(struct ev_task *task);
202 
203 static inline struct io_can_chan_impl *io_can_chan_impl_from_dev(
204  const io_dev_t *dev);
205 static inline struct io_can_chan_impl *io_can_chan_impl_from_chan(
206  const io_can_chan_t *chan);
207 static inline struct io_can_chan_impl *io_can_chan_impl_from_svc(
208  const struct io_svc *svc);
209 
210 static void io_can_chan_impl_c_signal(struct spscring *ring, void *arg);
211 
212 static void io_can_chan_impl_do_pop(struct io_can_chan_impl *impl,
213  struct sllist *read_queue, struct sllist *write_queue,
214  struct sllist *confirm_queue, struct ev_task *task);
215 
216 static void io_can_chan_impl_do_read(struct io_can_chan_impl *impl,
217  struct sllist *queue, int *pwouldblock);
218 static void io_can_chan_impl_do_confirm(struct io_can_chan_impl *impl,
219  struct sllist *queue, const struct can_msg *msg);
220 
221 static size_t io_can_chan_impl_do_abort_tasks(struct io_can_chan_impl *impl);
222 
223 static int io_can_chan_impl_set_fd(
224  struct io_can_chan_impl *impl, int fd, int flags);
225 
226 void *
227 io_can_chan_alloc(void)
228 {
229  struct io_can_chan_impl *impl = malloc(sizeof(*impl));
230  if (!impl)
231  return NULL;
232  // Suppress a GCC maybe-uninitialized warning.
233  impl->chan_vptr = NULL;
234  // cppcheck-suppress memleak symbolName=impl
235  return &impl->chan_vptr;
236 }
237 
238 void
239 io_can_chan_free(void *ptr)
240 {
241  if (ptr)
242  free(io_can_chan_impl_from_chan(ptr));
243 }
244 
246 io_can_chan_init(io_can_chan_t *chan, io_poll_t *poll, ev_exec_t *exec,
247  size_t rxlen)
248 {
249  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
250  io_ctx_t *ctx = poll ? io_poll_get_ctx(poll) : NULL;
251 
252  if (!rxlen)
253  rxlen = LELY_IO_CAN_RXLEN;
254 
255  int errsv = 0;
256 
257  impl->dev_vptr = &io_can_chan_impl_dev_vtbl;
258  impl->chan_vptr = &io_can_chan_impl_vtbl;
259 
260  impl->poll = poll;
261 
262  impl->svc = (struct io_svc)IO_SVC_INIT(&io_can_chan_impl_svc_vtbl);
263  impl->ctx = ctx;
264 
265  impl->exec = exec;
266 
267  impl->watch = (struct io_poll_watch)IO_POLL_WATCH_INIT(
268  &io_can_chan_impl_watch_func);
269 
270  impl->rxbuf_task = (struct ev_task)EV_TASK_INIT(
271  impl->exec, &io_can_chan_impl_rxbuf_task_func);
272  impl->read_task = (struct ev_task)EV_TASK_INIT(
273  impl->exec, &io_can_chan_impl_read_task_func);
274  impl->write_task = (struct ev_task)EV_TASK_INIT(
275  impl->exec, &io_can_chan_impl_write_task_func);
276 
277 #if !LELY_NO_THREADS
278  if ((errsv = pthread_mutex_init(&impl->c_mtx, NULL)))
279  goto error_init_c_mtx;
280 #endif
281 
282  spscring_init(&impl->rxring, rxlen);
283  impl->rxbuf = calloc(rxlen, sizeof(struct io_can_frame));
284  if (!impl->rxbuf) {
285  errsv = errno;
286  goto error_alloc_rxbuf;
287  }
288 
289 #if !LELY_NO_THREADS
290  if ((errsv = pthread_mutex_init(&impl->mtx, NULL)))
291  goto error_init_mtx;
292 #endif
293 
294  impl->fd = -1;
295  impl->flags = 0;
296  impl->events = 0;
297 
298  impl->shutdown = 0;
299  impl->rxbuf_posted = 0;
300  impl->read_posted = 0;
301  impl->write_posted = 0;
302 
303  sllist_init(&impl->read_queue);
304  sllist_init(&impl->write_queue);
305  sllist_init(&impl->confirm_queue);
306  impl->current_write = NULL;
307 
308  if (impl->ctx)
309  io_ctx_insert(impl->ctx, &impl->svc);
310 
311  return chan;
312 
313 #if !LELY_NO_THREADS
314  // pthread_mutex_destroy(&impl->mtx);
315 error_init_mtx:
316 #endif
317  free(impl->rxbuf);
318 error_alloc_rxbuf:
319 #if !LELY_NO_THREADS
320  pthread_mutex_destroy(&impl->c_mtx);
321 error_init_c_mtx:
322 #endif
323  errno = errsv;
324  return NULL;
325 }
326 
327 void
328 io_can_chan_fini(io_can_chan_t *chan)
329 {
330  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
331 
332  if (impl->ctx)
333  io_ctx_remove(impl->ctx, &impl->svc);
334  // Cancel all pending operations.
335  io_can_chan_impl_svc_shutdown(&impl->svc);
336 
337 #if !LELY_NO_THREADS
338  // Abort any consumer wait operation running in a task.
339  pthread_mutex_lock(&impl->c_mtx);
341  pthread_mutex_unlock(&impl->c_mtx);
342 
343  int warning = 0;
344  pthread_mutex_lock(&impl->mtx);
345  // If necessary, busy-wait until io_can_chan_impl_rxbuf_task_func(),
346  // io_can_chan_impl_read_task_func() and
347  // io_can_chan_impl_write_task_func() complete.
348  while (impl->rxbuf_posted || impl->read_posted || impl->write_posted) {
349  if (io_can_chan_impl_do_abort_tasks(impl))
350  continue;
351  pthread_mutex_unlock(&impl->mtx);
352  if (!warning) {
353  warning = 1;
354  diag(DIAG_WARNING, 0,
355  "io_can_chan_fini() invoked with pending operations");
356  }
357  sched_yield();
358  pthread_mutex_lock(&impl->mtx);
359  }
360  pthread_mutex_unlock(&impl->mtx);
361 #endif
362 
363  // Close the socket.
364  if (impl->fd != -1) {
365  if (impl->events)
366  io_poll_watch(impl->poll, impl->fd, 0, &impl->watch);
367  close(impl->fd);
368  }
369 
370 #if !LELY_NO_THREADS
371  pthread_mutex_destroy(&impl->mtx);
372 #endif
373 
374  free(impl->rxbuf);
375 
376 #if !LELY_NO_THREADS
377  pthread_mutex_destroy(&impl->c_mtx);
378 #endif
379 }
380 
383 {
384  int errsv = 0;
385 
386  io_can_chan_t *chan = io_can_chan_alloc();
387  if (!chan) {
388  errsv = errno;
389  goto error_alloc;
390  }
391 
392  io_can_chan_t *tmp = io_can_chan_init(chan, poll, exec, rxlen);
393  if (!tmp) {
394  errsv = errno;
395  goto error_init;
396  }
397  chan = tmp;
398 
399  return chan;
400 
401 error_init:
402  io_can_chan_free((void *)chan);
403 error_alloc:
404  errno = errsv;
405  return NULL;
406 }
407 
408 void
410 {
411  if (chan) {
412  io_can_chan_fini(chan);
413  io_can_chan_free((void *)chan);
414  }
415 }
416 
417 int
419 {
420  const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
421 
422 #if !LELY_NO_THREADS
423  pthread_mutex_lock((pthread_mutex_t *)&impl->mtx);
424 #endif
425  int fd = impl->fd;
426 #if !LELY_NO_THREADS
427  pthread_mutex_unlock((pthread_mutex_t *)&impl->mtx);
428 #endif
429  return fd;
430 }
431 
432 int
434 {
435  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
436 
438  errno = EINVAL;
439  return -1;
440  }
441 
442  int errsv = 0;
443 
444  int fd = socket(AF_CAN, SOCK_RAW | SOCK_CLOEXEC, CAN_RAW);
445  if (fd == -1) {
446  errsv = errno;
447  goto error_socket;
448  }
449 
450  struct sockaddr_can addr = { .can_family = AF_CAN,
451  .can_ifindex = io_can_ctrl_get_index(ctrl) };
452 
453  if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
454  errsv = errno;
455  goto error_bind;
456  }
457 
458  if (flags & IO_CAN_BUS_FLAG_ERR) {
459  can_err_mask_t optval = CAN_ERR_MASK;
460  // clang-format off
461  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_ERR_FILTER, &optval,
462  sizeof(optval)) == -1) {
463  // clang-format on
464  errsv = errno;
465  goto error_setsockopt;
466  }
467  }
468 
469 #if !LELY_NO_CANFD
470  if (flags & IO_CAN_BUS_FLAG_FDF) {
471  int optval = 1;
472  // clang-format off
473  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_FD_FRAMES, &optval,
474  sizeof(optval)) == -1) {
475  // clang-format on
476  errsv = errno;
477  goto error_setsockopt;
478  }
479  }
480 #endif
481 
482  if (io_can_fd_set_default(fd) == -1) {
483  errsv = errno;
484  goto error_set_default;
485  }
486 
487  fd = io_can_chan_impl_set_fd(impl, fd, flags);
488  if (fd != -1)
489  close(fd);
490 
491  return 0;
492 
493 error_set_default:
494 error_setsockopt:
495 error_bind:
496  close(fd);
497 error_socket:
498  errno = errsv;
499  return -1;
500 }
501 
502 int
504 {
505  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
506 
507  struct sockaddr_can addr = { .can_family = AF_UNSPEC };
508  socklen_t addrlen = sizeof(addr);
509  if (getsockname(fd, (struct sockaddr *)&addr, &addrlen) == -1)
510  return -1;
511  if (addrlen < sizeof(addr) || addr.can_family != AF_CAN) {
512  errno = ENODEV;
513  return -1;
514  }
515  unsigned int ifindex = addr.can_ifindex;
516 
517  struct io_can_attr attr = IO_CAN_ATTR_INIT;
518  if (io_can_attr_get(&attr, ifindex) == -1)
519  return -1;
520  int flags = attr.flags;
521 
522  {
523  can_err_mask_t optval = 0;
524  socklen_t optlen = sizeof(optval);
525  // clang-format off
526  if (getsockopt(fd, SOL_CAN_RAW, CAN_RAW_ERR_FILTER, &optval,
527  &optlen) == -1)
528  // clang-format on
529  return -1;
530  if (optval & CAN_ERR_MASK)
531  flags |= IO_CAN_BUS_FLAG_ERR;
532  }
533 
534 #if !LELY_NO_CANFD
535  // Check if CAN FD frames are allowed. If the check fails, we assume
536  // they are not.
537  {
538  int errsv = errno;
539  int optval = 0;
540  socklen_t optlen = sizeof(optval);
541  // clang-format off
542  if (!getsockopt(fd, SOL_CAN_RAW, CAN_RAW_FD_FRAMES, &optval,
543  &optlen) && !optval)
544  // clang-format on
546  errno = errsv;
547  }
548 #endif
549 
550  if (io_can_fd_set_default(fd) == -1)
551  return -1;
552 
553  fd = io_can_chan_impl_set_fd(impl, fd, flags);
554  if (fd != -1)
555  close(fd);
556 
557  return 0;
558 }
559 
560 int
562 {
563  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
564 
565  return io_can_chan_impl_set_fd(impl, -1, 0);
566 }
567 
568 int
570 {
571  return io_can_chan_get_handle(chan) != -1;
572 }
573 
574 int
576 {
577  int fd = io_can_chan_release(chan);
578  return fd != -1 ? close(fd) : 0;
579 }
580 
581 static int
582 io_can_fd_set_default(int fd)
583 {
584  int optval;
585 
586  // Enable local loopback.
587  optval = 1;
588  // clang-format off
589  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_LOOPBACK, &optval,
590  sizeof(optval)) == -1)
591  // clang-format on
592  return -1;
593 
594  // Enable the reception of CAN frames sent by this socket so we can
595  // check for successful transmission.
596  optval = 1;
597  // clang-format off
598  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_RECV_OWN_MSGS, &optval,
599  sizeof(optval)) == -1)
600  // clang-format on
601  return -1;
602 
603  // Set the size of the send buffer to its minimum value. This causes
604  // write operations to block (or return EAGAIN) instead of returning
605  // ENOBUFS.
606  optval = 0;
607  // clang-format off
608  if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval))
609  == -1)
610  // clang-format on
611  return -1;
612 
613  return 0;
614 }
615 
616 static int
617 #if LELY_NO_CANFD
618 io_can_fd_read(int fd, struct can_frame *frame, size_t *pnbytes, int *pflags,
619  struct timespec *tp, int timeout)
620 #else
621 io_can_fd_read(int fd, struct canfd_frame *frame, size_t *pnbytes, int *pflags,
622  struct timespec *tp, int timeout)
623 #endif
624 {
625  struct iovec iov = { .iov_base = (void *)frame,
626  .iov_len = sizeof(*frame) };
627  struct msghdr msg = { .msg_iov = &iov, .msg_iovlen = 1 };
628 
629  ssize_t result;
630  for (;;) {
631  result = io_fd_recvmsg(fd, &msg, 0, timeout);
632  if (result < 0)
633  return result;
634 #if LELY_NO_CANFD
635  if (result == CAN_MTU)
636 #else
637  if (result == CAN_MTU || result == CANFD_MTU)
638 #endif
639  break;
640  if (timeout > 0)
641  timeout = 0;
642  }
643 
644  if (pnbytes)
645  *pnbytes = result;
646 
647  if (pflags)
648  *pflags = msg.msg_flags;
649 
650  if (tp) {
651  if (msg.msg_flags & MSG_CONFIRM) {
652  // Ignore the timestamp for write confirmations.
653  *tp = (struct timespec){ 0, 0 };
654  } else {
655  struct timeval tv = { 0, 0 };
656  if (ioctl(fd, SIOCGSTAMP, &tv) == -1)
657  return -1;
658  tp->tv_sec = tv.tv_sec;
659  tp->tv_nsec = tv.tv_usec * 1000;
660  }
661  }
662 
663  return 0;
664 }
665 
666 static int
667 #if LELY_NO_CANFD
668 io_can_fd_write(int fd, const struct can_frame *frame, size_t nbytes,
669  int timeout)
670 #else
671 io_can_fd_write(int fd, const struct canfd_frame *frame, size_t nbytes,
672  int timeout)
673 #endif
674 {
675  struct iovec iov = { .iov_base = (void *)frame, .iov_len = nbytes };
676  struct msghdr msg = { .msg_iov = &iov, .msg_iovlen = 1 };
677 
678  return io_fd_sendmsg(fd, &msg, 0, timeout) > 0 ? 0 : -1;
679 }
680 
681 static int
682 io_can_fd_write_msg(int fd, const struct can_msg *msg, int timeout)
683 {
684  assert(msg);
685 
686  // Convert the frame to the SocketCAN format.
687  struct io_can_frame frame;
688 #if !LELY_NO_CANFD
689  if (msg->flags & CAN_FLAG_FDF) {
690  if (can_msg2canfd_frame(msg, &frame.frame) == -1) {
691  errno = EINVAL;
692  return -1;
693  }
694  frame.nbytes = CANFD_MTU;
695  } else {
696 #endif
697  if (can_msg2can_frame(msg, (struct can_frame *)&frame.frame)
698  == -1) {
699  errno = EINVAL;
700  return -1;
701  }
702  frame.nbytes = CAN_MTU;
703 #if !LELY_NO_CANFD
704  }
705 #endif
706 
707  return io_can_fd_write(fd, &frame.frame, frame.nbytes, timeout);
708 }
709 
710 static io_ctx_t *
711 io_can_chan_impl_dev_get_ctx(const io_dev_t *dev)
712 {
713  const struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
714 
715  return impl->ctx;
716 }
717 
718 static ev_exec_t *
719 io_can_chan_impl_dev_get_exec(const io_dev_t *dev)
720 {
721  const struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
722 
723  return impl->exec;
724 }
725 
726 static size_t
727 io_can_chan_impl_dev_cancel(io_dev_t *dev, struct ev_task *task)
728 {
729  struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
730 
731  size_t n = 0;
732 
733  struct sllist read_queue, write_queue, confirm_queue;
734  sllist_init(&read_queue);
735  sllist_init(&write_queue);
736  sllist_init(&confirm_queue);
737 
738 #if !LELY_NO_THREADS
739  pthread_mutex_lock(&impl->mtx);
740 #endif
741  io_can_chan_impl_do_pop(
742  impl, &read_queue, &write_queue, &confirm_queue, task);
743  // Mark the ongoing write operation as canceled, if necessary.
744  if (impl->current_write && (!task || task == impl->current_write)) {
745  impl->current_write = NULL;
746  n++;
747  }
748 #if !LELY_NO_THREADS
749  pthread_mutex_unlock(&impl->mtx);
750 #endif
751 
752  size_t nread = io_can_chan_read_queue_post(&read_queue, -1, ECANCELED);
753  n = n < SIZE_MAX - nread ? n + nread : SIZE_MAX;
754  size_t nwrite = io_can_chan_write_queue_post(&write_queue, ECANCELED);
755  n = n < SIZE_MAX - nwrite ? n + nwrite : SIZE_MAX;
756  size_t nconfirm =
757  io_can_chan_write_queue_post(&confirm_queue, ECANCELED);
758  n = n < SIZE_MAX - nconfirm ? n + nconfirm : SIZE_MAX;
759 
760  return n;
761 }
762 
763 static size_t
764 io_can_chan_impl_dev_abort(io_dev_t *dev, struct ev_task *task)
765 {
766  struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
767 
768  struct sllist queue;
769  sllist_init(&queue);
770 
771 #if !LELY_NO_THREADS
772  pthread_mutex_lock(&impl->mtx);
773 #endif
774  io_can_chan_impl_do_pop(impl, &queue, &queue, &queue, task);
775 #if !LELY_NO_THREADS
776  pthread_mutex_unlock(&impl->mtx);
777 #endif
778 
779  return ev_task_queue_abort(&queue);
780 }
781 static io_dev_t *
782 io_can_chan_impl_get_dev(const io_can_chan_t *chan)
783 {
784  const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
785 
786  return &impl->dev_vptr;
787 }
788 
789 static int
790 io_can_chan_impl_get_flags(const io_can_chan_t *chan)
791 {
792  const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
793 
794 #if !LELY_NO_THREADS
795  pthread_mutex_lock((pthread_mutex_t *)&impl->mtx);
796 #endif
797  int flags = impl->flags;
798 #if !LELY_NO_THREADS
799  pthread_mutex_unlock((pthread_mutex_t *)&impl->mtx);
800 #endif
801  return flags;
802 }
803 
804 static int
805 io_can_chan_impl_read(io_can_chan_t *chan, struct can_msg *msg,
806  struct can_err *err, struct timespec *tp, int timeout)
807 {
808  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
809 
810  struct io_can_frame frame_;
811  struct io_can_frame *frame = &frame_;
812 #if !LELY_NO_THREADS
813  pthread_mutex_lock(&impl->c_mtx);
814 #endif
815  size_t n = 0;
816  for (;;) {
817  // Check if a frame is available in the receive queue.
818  n = 1;
819  size_t i = spscring_c_alloc(&impl->rxring, &n);
820  if (n) {
821  frame = &impl->rxbuf[i];
822  break;
823  }
824 #if !LELY_NO_THREADS
825  pthread_mutex_unlock(&impl->c_mtx);
826  pthread_mutex_lock(&impl->mtx);
827 #endif
828  // If not, read a frame directly.
829  int fd = impl->fd;
830 #if !LELY_NO_THREADS
831  pthread_mutex_unlock(&impl->mtx);
832 #endif
833  int flags = 0;
834  // clang-format off
835  if (io_can_fd_read(fd, &frame->frame, &frame->nbytes, &flags,
836  &frame->ts, timeout) < 0)
837  // clang-format on
838  return -1;
839  // Process the frame unless it is a write confirmation.
840  if (!(flags & MSG_CONFIRM))
841  break;
842  // Convert the frame from the SocketCAN format.
843  void *src = &frame->frame;
844  struct can_msg msg;
845 #if !LELY_NO_CANFD
846  if (frame->nbytes == CANFD_MTU)
847  canfd_frame2can_msg(src, &msg);
848  else
849 #endif
850  can_frame2can_msg(src, &msg);
851  // Process the write confirmation.
852  struct sllist queue;
853  sllist_init(&queue);
854 #if !LELY_NO_THREADS
855  pthread_mutex_lock(&impl->mtx);
856 #endif
857  io_can_chan_impl_do_confirm(impl, &queue, &msg);
858 #if !LELY_NO_THREADS
859  pthread_mutex_unlock(&impl->mtx);
860 #endif
861  ev_task_queue_post(&queue);
862  // Since the timeout is relative, we can only use a positive
863  // value once.
864  if (timeout > 0)
865  timeout = 0;
866 #if !LELY_NO_THREADS
867  pthread_mutex_lock(&impl->c_mtx);
868 #endif
869  }
870  // Parse the frame.
871  void *data = &frame->frame;
872  int is_err = can_frame2can_err(data, err);
873  if (!is_err && msg) {
874 #if !LELY_NO_CANFD
875  if (frame->nbytes == CANFD_MTU)
876  canfd_frame2can_msg(data, msg);
877  else
878 #endif
879  can_frame2can_msg(data, msg);
880  }
881  if (tp)
882  *tp = frame->ts;
883  // Remove the frame from the receive queue, if necessary.
884  if (n) {
885  spscring_c_commit(&impl->rxring, n);
886 #if !LELY_NO_THREADS
887  pthread_mutex_unlock(&impl->c_mtx);
888 #endif
889  }
890 
891  return is_err == -1 ? -1 : !is_err;
892 }
893 
894 static void
895 io_can_chan_impl_submit_read(io_can_chan_t *chan, struct io_can_chan_read *read)
896 {
897  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
898  assert(read);
899  struct ev_task *task = &read->task;
900 
901  if (!task->exec)
902  task->exec = impl->exec;
903  assert(task->exec);
904  ev_exec_on_task_init(task->exec);
905 
906 #if !LELY_NO_THREADS
907  pthread_mutex_lock(&impl->mtx);
908 #endif
909  if (impl->shutdown) {
910 #if !LELY_NO_THREADS
911  pthread_mutex_unlock(&impl->mtx);
912 #endif
913  io_can_chan_read_post(read, -1, ECANCELED);
914  } else {
915  int post_read = !impl->read_posted
916  && sllist_empty(&impl->read_queue);
917  sllist_push_back(&impl->read_queue, &task->_node);
918  if (post_read)
919  impl->read_posted = 1;
920 #if !LELY_NO_THREADS
921  pthread_mutex_unlock(&impl->mtx);
922 #endif
923  assert(impl->read_task.exec);
924  if (post_read)
925  ev_exec_post(impl->read_task.exec, &impl->read_task);
926  }
927 }
928 
929 static int
930 io_can_chan_impl_write(
931  io_can_chan_t *chan, const struct can_msg *msg, int timeout)
932 {
933  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
934 
935 #if !LELY_NO_CANFD
936  int flags = 0;
937  if (msg->flags & CAN_FLAG_FDF)
939  if (msg->flags & CAN_FLAG_BRS)
941 #endif
942 
943 #if !LELY_NO_THREADS
944  pthread_mutex_lock(&impl->mtx);
945 #endif
946 #if !LELY_NO_CANFD
947  if ((flags & impl->flags) != flags) {
948 #if !LELY_NO_THREADS
949  pthread_mutex_unlock(&impl->mtx);
950 #endif
951  errno = EINVAL;
952  return -1;
953  }
954 #endif
955  int fd = impl->fd;
956 #if !LELY_NO_THREADS
957  pthread_mutex_unlock(&impl->mtx);
958 #endif
959 
960  return io_can_fd_write_msg(fd, msg, timeout);
961 }
962 
963 static void
964 io_can_chan_impl_submit_write(
965  io_can_chan_t *chan, struct io_can_chan_write *write)
966 {
967  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
968  assert(write);
969  assert(write->msg);
970  struct ev_task *task = &write->task;
971 
972 #if !LELY_NO_CANFD
973  int flags = 0;
974  if (write->msg->flags & CAN_FLAG_FDF)
975  flags |= IO_CAN_BUS_FLAG_FDF;
976  if (write->msg->flags & CAN_FLAG_BRS)
977  flags |= IO_CAN_BUS_FLAG_BRS;
978 #endif
979 
980  if (!task->exec)
981  task->exec = impl->exec;
982  assert(task->exec);
983  ev_exec_on_task_init(task->exec);
984 
985 #if !LELY_NO_THREADS
986  pthread_mutex_lock(&impl->mtx);
987 #endif
988  if (impl->shutdown) {
989 #if !LELY_NO_THREADS
990  pthread_mutex_unlock(&impl->mtx);
991 #endif
992  io_can_chan_write_post(write, ECANCELED);
993 #if !LELY_NO_CANFD
994  } else if ((flags & impl->flags) != flags) {
995 #if !LELY_NO_THREADS
996  pthread_mutex_unlock(&impl->mtx);
997 #endif
998  io_can_chan_write_post(write, EINVAL);
999 #endif
1000  } else {
1001  int post_write = !impl->write_posted
1002  && sllist_empty(&impl->write_queue);
1003  sllist_push_back(&impl->write_queue, &task->_node);
1004  if (post_write)
1005  impl->write_posted = 1;
1006 #if !LELY_NO_THREADS
1007  pthread_mutex_unlock(&impl->mtx);
1008 #endif
1009  assert(impl->write_task.exec);
1010  if (post_write)
1011  ev_exec_post(impl->write_task.exec, &impl->write_task);
1012  }
1013 }
1014 
1015 static void
1016 io_can_chan_impl_svc_shutdown(struct io_svc *svc)
1017 {
1018  struct io_can_chan_impl *impl = io_can_chan_impl_from_svc(svc);
1019  io_dev_t *dev = &impl->dev_vptr;
1020 
1021 #if !LELY_NO_THREADS
1022  pthread_mutex_lock(&impl->mtx);
1023 #endif
1024  int shutdown = !impl->shutdown;
1025  impl->shutdown = 1;
1026  if (shutdown) {
1027  if (impl->events) {
1028  impl->events = 0;
1029  // Stop monitoring I/O events.
1030  io_poll_watch(impl->poll, impl->fd, impl->events,
1031  &impl->watch);
1032  }
1033  // Try to abort io_can_chan_impl_rxbuf_task_func(),
1034  // io_can_chan_impl_read_task_func() and
1035  // io_can_chan_impl_write_task_func().
1036  io_can_chan_impl_do_abort_tasks(impl);
1037  }
1038 #if !LELY_NO_THREADS
1039  pthread_mutex_unlock(&impl->mtx);
1040 #endif
1041  // cppcheck-suppress duplicateCondition
1042  if (shutdown)
1043  // Cancel all pending operations.
1044  io_can_chan_impl_dev_cancel(dev, NULL);
1045 }
1046 
1047 static void
1048 io_can_chan_impl_watch_func(struct io_poll_watch *watch, int events)
1049 {
1050  assert(watch);
1051  struct io_can_chan_impl *impl =
1053 
1054 #if !LELY_NO_THREADS
1055  pthread_mutex_lock(&impl->mtx);
1056 #endif
1057  // Continue monitoring events that are not otherwise handled.
1058  if ((events & IO_EVENT_ERR) || impl->fd == -1 || impl->shutdown) {
1059  impl->events = 0;
1060  } else if ((impl->events &= ~events) != 0) {
1061  int errsv = errno;
1062  // clang-format off
1063  if (io_poll_watch(impl->poll, impl->fd, impl->events,
1064  &impl->watch) == -1) {
1065  // clang-format on
1066  impl->events = 0;
1067  events |= IO_EVENT_ERR;
1068  }
1069  errno = errsv;
1070  }
1071 
1072  // Process incoming CAN frames.
1073  int post_rxbuf = 0;
1074  if ((events & (IO_EVENT_IN | IO_EVENT_ERR)) && !impl->shutdown) {
1075  post_rxbuf = !impl->rxbuf_posted;
1076  impl->rxbuf_posted = 1;
1077  }
1078 
1079  // Retry any pending write operations.
1080  int post_write = 0;
1081  if ((events & (IO_EVENT_OUT | IO_EVENT_ERR))
1082  && !sllist_empty(&impl->write_queue)
1083  && !impl->shutdown) {
1084  post_write = !impl->write_posted;
1085  impl->write_posted = 1;
1086  }
1087 #if !LELY_NO_THREADS
1088  pthread_mutex_unlock(&impl->mtx);
1089 #endif
1090 
1091  if (post_rxbuf)
1092  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1093  if (post_write)
1094  ev_exec_post(impl->write_task.exec, &impl->write_task);
1095 }
1096 
1097 static void
1098 io_can_chan_impl_rxbuf_task_func(struct ev_task *task)
1099 {
1100  assert(task);
1101  struct io_can_chan_impl *impl =
1102  structof(task, struct io_can_chan_impl, rxbuf_task);
1103 
1104  int errsv = errno;
1105 
1106  struct sllist queue;
1107  sllist_init(&queue);
1108 
1109  int result = 0;
1110  int errc = 0;
1111  int wouldblock = 0;
1112 
1113 #if !LELY_NO_THREADS
1114  pthread_mutex_lock(&impl->mtx);
1115 #endif
1116  // Only try to fill the receive queue if there are pending read
1117  // operations and there is an empty slot in the queue, or if there are
1118  // pending write confirmations.
1119  // clang-format off
1120  while ((!sllist_empty(&impl->read_queue)
1121  && spscring_p_capacity(&impl->rxring))
1122  || !sllist_empty(&impl->confirm_queue)) {
1123  // clang-format on
1124  int fd = impl->fd;
1125 #if !LELY_NO_THREADS
1126  pthread_mutex_unlock(&impl->mtx);
1127 #endif
1128 
1129  struct io_can_frame frame_;
1130  struct io_can_frame *frame = &frame_;
1131  // Try to obtain an empty slot in the receive queue.
1132  size_t n = 1;
1133  size_t i = spscring_p_alloc(&impl->rxring, &n);
1134  if (n)
1135  frame = &impl->rxbuf[i];
1136 
1137  // Try to read a CAN or CAN FD format frame from the CAN bus.
1138  int flags = 0;
1139  result = io_can_fd_read(fd, &frame->frame, &frame->nbytes,
1140  &flags, &frame->ts,
1141  impl->poll ? 0 : LELY_IO_RX_TIMEOUT);
1142  errc = !result ? 0 : errno;
1143  wouldblock = errc == EAGAIN || errc == EWOULDBLOCK;
1144 
1145  // Convert the frame from the SocketCAN format if it is a write
1146  // confirmation.
1147  struct can_msg msg;
1148  if (!result && (flags & MSG_CONFIRM)) {
1149  void *src = &frame->frame;
1150 #if !LELY_NO_CANFD
1151  if (frame->nbytes == CANFD_MTU)
1152  canfd_frame2can_msg(src, &msg);
1153  else
1154 #endif
1155  can_frame2can_msg(src, &msg);
1156  }
1157 
1158  // Make the frame available for reading.
1159  if (!result && !(flags & MSG_CONFIRM))
1160  spscring_p_commit(&impl->rxring, n);
1161 
1162 #if !LELY_NO_THREADS
1163  pthread_mutex_lock(&impl->mtx);
1164 #endif
1165  // Process the write confirmation, if any.
1166  if (!result && flags & MSG_CONFIRM)
1167  io_can_chan_impl_do_confirm(impl, &queue, &msg);
1168 
1169  // Stop if the operation did or would block, or if an error
1170  // occurred.
1171  if (!impl->poll || result < 0)
1172  break;
1173  }
1174  // Cancel all pending read operations on error.
1175  if (result < 0 && !wouldblock) {
1176  io_can_chan_impl_do_read(impl, &queue, NULL);
1177  while ((task = ev_task_from_node(
1178  sllist_pop_front(&impl->read_queue)))) {
1179  struct io_can_chan_read *read =
1181  read->r.result = result;
1182  read->r.errc = errc;
1183  sllist_push_back(&queue, &task->_node);
1184  }
1185  }
1186  // clang-format off
1187  int post_rxbuf = !(sllist_empty(&impl->read_queue)
1188  && sllist_empty(&impl->confirm_queue))
1189  && impl->fd != -1 && !impl->shutdown;
1190  // clang-format on
1191  // If a read operation would block, start monitoring the file descriptor
1192  // for I/O events.
1193  if (post_rxbuf && impl->poll && wouldblock) {
1194  int events = impl->events | IO_EVENT_IN;
1195  // clang-format off
1196  if (!io_poll_watch(impl->poll, impl->fd, events,
1197  &impl->watch)) {
1198  // clang-format on
1199  impl->events = events;
1200  // Do not repost this thask unless registering the file
1201  // descriptor fails.
1202  post_rxbuf = 0;
1203  }
1204  }
1205  impl->rxbuf_posted = post_rxbuf;
1206 #if !LELY_NO_THREADS
1207  pthread_mutex_unlock(&impl->mtx);
1208 #endif
1209 
1210  ev_task_queue_post(&queue);
1211 
1212  if (post_rxbuf)
1213  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1214 
1215  errno = errsv;
1216 }
1217 
1218 static void
1219 io_can_chan_impl_read_task_func(struct ev_task *task)
1220 {
1221  assert(task);
1222  struct io_can_chan_impl *impl =
1223  structof(task, struct io_can_chan_impl, read_task);
1224 
1225  struct sllist queue;
1226  sllist_init(&queue);
1227 
1228 #if !LELY_NO_THREADS
1229  pthread_mutex_lock(&impl->mtx);
1230 #endif
1231  // Process any pending read operations that can be satisfied.
1232  int wouldblock = 0;
1233  io_can_chan_impl_do_read(impl, &queue, &wouldblock);
1234  int post_rxbuf = 0;
1235  // Repost this task if any read operations remain in the queue.
1236  int post_read = !sllist_empty(&impl->read_queue) && !impl->shutdown;
1237  // Register a wait operation if the receive queue is empty.
1238  if (post_read && wouldblock) {
1239 #if !LELY_NO_THREADS
1240  pthread_mutex_lock(&impl->c_mtx);
1241 #endif
1242  // Do not repost this task unless the wait condition can be
1243  // satisfied immediately.
1244  post_read = !spscring_c_submit_wait(&impl->rxring, 1,
1245  io_can_chan_impl_c_signal, impl);
1246 #if !LELY_NO_THREADS
1247  pthread_mutex_unlock(&impl->c_mtx);
1248 #endif
1249  if (!post_read)
1250  // If the receive queue is empty, start reading more CAN
1251  // frames, unless we're already waiting for one.
1252  post_rxbuf = !impl->rxbuf_posted
1253  && !(impl->events & IO_EVENT_IN)
1254  && impl->fd != -1 && !impl->shutdown;
1255  }
1256  // cppcheck-suppress knownConditionTrueFalse
1257  if (post_rxbuf)
1258  impl->rxbuf_posted = 1;
1259  impl->read_posted = post_read;
1260 #if !LELY_NO_THREADS
1261  pthread_mutex_unlock(&impl->mtx);
1262 #endif
1263 
1264  ev_task_queue_post(&queue);
1265 
1266  // cppcheck-suppress knownConditionTrueFalse
1267  if (post_rxbuf)
1268  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1269 
1270  if (post_read)
1271  ev_exec_post(impl->read_task.exec, &impl->read_task);
1272 }
1273 
1274 static void
1275 io_can_chan_impl_write_task_func(struct ev_task *task)
1276 {
1277  assert(task);
1278  struct io_can_chan_impl *impl =
1279  structof(task, struct io_can_chan_impl, write_task);
1280 
1281  int errsv = errno;
1282 
1283  int wouldblock = 0;
1284 
1285 #if !LELY_NO_THREADS
1286  pthread_mutex_lock(&impl->mtx);
1287 #endif
1288  // Try to process all pending write operations at once, unless we're in
1289  // blocking mode.
1290  while ((task = impl->current_write = ev_task_from_node(
1291  sllist_pop_front(&impl->write_queue)))) {
1292  int fd = impl->fd;
1293 #if !LELY_NO_THREADS
1294  pthread_mutex_unlock(&impl->mtx);
1295 #endif
1296  struct io_can_chan_write *write =
1298  int result = io_can_fd_write_msg(fd, write->msg,
1299  impl->poll ? 0 : LELY_IO_TX_TIMEOUT);
1300  int errc = !result ? 0 : errno;
1301  wouldblock = errc == EAGAIN || errc == EWOULDBLOCK;
1302  if (!wouldblock && errc)
1303  // The operation failed immediately.
1304  io_can_chan_write_post(write, errc);
1305 #if !LELY_NO_THREADS
1306  pthread_mutex_lock(&impl->mtx);
1307 #endif
1308  if (!errc)
1309  // Wait for the write confirmation.
1310  sllist_push_back(&impl->confirm_queue, &task->_node);
1311  if (task == impl->current_write) {
1312  // Put the write operation back on the queue if it would
1313  // block, unless it was canceled.
1314  if (wouldblock) {
1316  &task->_node);
1317  task = NULL;
1318  }
1319  impl->current_write = NULL;
1320  }
1321  assert(!impl->current_write);
1322  // Stop if the operation did or would block.
1323  if (!impl->poll || wouldblock)
1324  break;
1325  }
1326  // If we're waiting for a write confirmation, start reading more CAN
1327  // frames, unless we're already waiting for one.
1328  int post_rxbuf = !impl->rxbuf_posted
1329  && !sllist_empty(&impl->confirm_queue)
1330  && !(impl->events & IO_EVENT_IN) && impl->fd != -1
1331  && !impl->shutdown;
1332  if (post_rxbuf)
1333  impl->rxbuf_posted = 1;
1334  // Repost this task if any write operations remain in the queue.
1335  int post_write = !sllist_empty(&impl->write_queue) && impl->fd != -1
1336  && !impl->shutdown;
1337  // If a write operation would block, start monitoring the file
1338  // descriptor for I/O events.
1339  if (post_write && impl->poll && wouldblock) {
1340  int events = impl->events | IO_EVENT_OUT;
1341  // clang-format off
1342  if (!io_poll_watch(impl->poll, impl->fd, events,
1343  &impl->watch)) {
1344  // clang-format on
1345  impl->events = events;
1346  // Do not repost this task unless registering the file
1347  // descriptor fails.
1348  post_write = 0;
1349  }
1350  }
1351  impl->write_posted = post_write;
1352 #if !LELY_NO_THREADS
1353  pthread_mutex_unlock(&impl->mtx);
1354 #endif
1355 
1356  if (task && wouldblock)
1357  // The operation would block but was canceled before it could be
1358  // requeued.
1359  io_can_chan_write_post(
1360  io_can_chan_write_from_task(task), ECANCELED);
1361 
1362  if (post_rxbuf)
1363  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1364 
1365  if (post_write)
1366  ev_exec_post(impl->write_task.exec, &impl->write_task);
1367 
1368  errno = errsv;
1369 }
1370 
1371 static inline struct io_can_chan_impl *
1372 io_can_chan_impl_from_dev(const io_dev_t *dev)
1373 {
1374  assert(dev);
1375 
1376  return structof(dev, struct io_can_chan_impl, dev_vptr);
1377 }
1378 
1379 static inline struct io_can_chan_impl *
1380 io_can_chan_impl_from_chan(const io_can_chan_t *chan)
1381 {
1382  assert(chan);
1383 
1384  return structof(chan, struct io_can_chan_impl, chan_vptr);
1385 }
1386 
1387 static inline struct io_can_chan_impl *
1388 io_can_chan_impl_from_svc(const struct io_svc *svc)
1389 {
1390  assert(svc);
1391 
1392  return structof(svc, struct io_can_chan_impl, svc);
1393 }
1394 
1395 static void
1396 io_can_chan_impl_c_signal(struct spscring *ring, void *arg)
1397 {
1398  (void)ring;
1399  struct io_can_chan_impl *impl = arg;
1400  assert(impl);
1401 
1402 #if !LELY_NO_THREADS
1403  pthread_mutex_lock(&impl->mtx);
1404 #endif
1405  int post_read = !impl->read_posted && !sllist_empty(&impl->read_queue)
1406  && !impl->shutdown;
1407  if (post_read)
1408  impl->read_posted = 1;
1409 #if !LELY_NO_THREADS
1410  pthread_mutex_unlock(&impl->mtx);
1411 #endif
1412  // cppcheck-suppress duplicateCondition
1413  if (post_read)
1414  ev_exec_post(impl->read_task.exec, &impl->read_task);
1415 }
1416 
1417 static void
1418 io_can_chan_impl_do_pop(struct io_can_chan_impl *impl,
1419  struct sllist *read_queue, struct sllist *write_queue,
1420  struct sllist *confirm_queue, struct ev_task *task)
1421 {
1422  assert(impl);
1423  assert(read_queue);
1424  assert(write_queue);
1425  assert(confirm_queue);
1426 
1427  if (!task) {
1431  } else if (sllist_remove(&impl->read_queue, &task->_node)) {
1432  sllist_push_back(read_queue, &task->_node);
1433  } else if (sllist_remove(&impl->write_queue, &task->_node)) {
1434  sllist_push_back(write_queue, &task->_node);
1435  } else if (sllist_remove(&impl->confirm_queue, &task->_node)) {
1436  sllist_push_back(confirm_queue, &task->_node);
1437  }
1438 }
1439 
1440 static void
1441 io_can_chan_impl_do_read(struct io_can_chan_impl *impl, struct sllist *queue,
1442  int *pwouldblock)
1443 {
1444  assert(impl);
1445  assert(queue);
1446 
1447  int errsv = errno;
1448 
1449  int wouldblock = 0;
1450 
1451  struct slnode *node;
1452  while ((node = sllist_first(&impl->read_queue))) {
1453  struct ev_task *task = ev_task_from_node(node);
1454  struct io_can_chan_read *read =
1456 
1457 #if !LELY_NO_THREADS
1458  pthread_mutex_lock(&impl->c_mtx);
1459 #endif
1460 
1461  // Check if a frame is available in the receive queue.
1462  size_t n = 1;
1463  size_t i = spscring_c_alloc(&impl->rxring, &n);
1464  if (!n) {
1465 #if !LELY_NO_THREADS
1466  pthread_mutex_unlock(&impl->c_mtx);
1467 #endif
1468  wouldblock = 1;
1469  break;
1470  }
1471 
1472  // Copy the frame from the buffer.
1473  struct io_can_frame *frame = &impl->rxbuf[i];
1474  void *data = &frame->frame;
1475  int is_err = can_frame2can_err(data, read->err);
1476  if (!is_err && read->msg) {
1477 #if !LELY_NO_CANFD
1478  if (frame->nbytes == CANFD_MTU)
1479  canfd_frame2can_msg(data, read->msg);
1480  else
1481 #endif
1482  can_frame2can_msg(data, read->msg);
1483  }
1484  if (read->tp)
1485  *read->tp = frame->ts;
1486  spscring_c_commit(&impl->rxring, 1);
1487 
1488 #if !LELY_NO_THREADS
1489  pthread_mutex_unlock(&impl->c_mtx);
1490 #endif
1491 
1492  read->r.result = !is_err;
1493  read->r.errc = 0;
1494 
1495  sllist_pop_front(&impl->read_queue);
1496  sllist_push_back(queue, node);
1497  }
1498 
1499  if (pwouldblock)
1500  *pwouldblock = wouldblock;
1501 
1502  errno = errsv;
1503 }
1504 
1505 static void
1506 io_can_chan_impl_do_confirm(struct io_can_chan_impl *impl, struct sllist *queue,
1507  const struct can_msg *msg)
1508 {
1509  assert(impl);
1510  assert(queue);
1511  assert(msg);
1512 
1513  // Find the matching write operation.
1514  struct slnode *node = sllist_first(&impl->confirm_queue);
1515  while (node) {
1517  ev_task_from_node(node));
1518  if (!can_msg_cmp(msg, write->msg))
1519  break;
1520  node = node->next;
1521  }
1522  if (!node)
1523  return;
1524 
1525  // Complete the matching write operation. Any preceding write operations
1526  // waiting for confirmation are considered to have failed.
1527  struct ev_task *task;
1528  while ((task = ev_task_from_node(
1529  sllist_pop_front(&impl->confirm_queue)))) {
1530  sllist_push_front(queue, &task->_node);
1531  struct io_can_chan_write *write =
1533  if (&task->_node == node) {
1534  write->errc = 0;
1535  break;
1536  } else {
1537  write->errc = EIO;
1538  }
1539  }
1540 }
1541 
1542 static size_t
1543 io_can_chan_impl_do_abort_tasks(struct io_can_chan_impl *impl)
1544 {
1545  assert(impl);
1546 
1547  size_t n = 0;
1548 
1549  // Try to abort io_can_chan_impl_rxbuf_task_func().
1550  // clang-format off
1551  if (impl->rxbuf_posted && ev_exec_abort(impl->rxbuf_task.exec,
1552  &impl->rxbuf_task)) {
1553  // clang-format on
1554  impl->rxbuf_posted = 0;
1555  n++;
1556  }
1557 
1558  // Try to abort io_can_chan_impl_read_task_func().
1559  // clang-format off
1560  if (impl->read_posted && ev_exec_abort(impl->read_task.exec,
1561  &impl->read_task)) {
1562  // clang-format on
1563  impl->read_posted = 0;
1564  n++;
1565  }
1566 
1567  // Try to abort io_can_chan_impl_write_task_func().
1568  // clang-format off
1569  if (impl->write_posted && ev_exec_abort(impl->write_task.exec,
1570  &impl->write_task)) {
1571  // clang-format on
1572  impl->write_posted = 0;
1573  n++;
1574  }
1575 
1576  return n;
1577 }
1578 
1579 static int
1580 io_can_chan_impl_set_fd(struct io_can_chan_impl *impl, int fd, int flags)
1581 {
1582  assert(impl);
1583  assert(!(flags & ~IO_CAN_BUS_FLAG_MASK));
1584 
1585  struct sllist read_queue, write_queue, confirm_queue;
1586  sllist_init(&read_queue);
1587  sllist_init(&write_queue);
1588  sllist_init(&confirm_queue);
1589 
1590 #if !LELY_NO_THREADS
1591  pthread_mutex_lock(&impl->mtx);
1592 #endif
1593 
1594  if (impl->events) {
1595  impl->events = 0;
1596  // Stop monitoring I/O events.
1597  io_poll_watch(impl->poll, impl->fd, impl->events, &impl->watch);
1598  }
1599 
1600 #if !LELY_NO_THREADS
1601  pthread_mutex_lock(&impl->c_mtx);
1602 #endif
1603  spscring_c_abort_wait(&impl->rxring);
1604  // Clear the receive queue.
1605  size_t n = SIZE_MAX;
1606  spscring_c_alloc(&impl->rxring, &n);
1607  if (n)
1608  spscring_c_commit(&impl->rxring, n);
1609 #if !LELY_NO_THREADS
1610  pthread_mutex_unlock(&impl->c_mtx);
1611 #endif
1612 
1613  int tmp = impl->fd;
1614  impl->fd = fd;
1615  fd = tmp;
1616 
1617  impl->flags = flags;
1618 
1619  // Cancel pending operations.
1620  sllist_append(&read_queue, &impl->read_queue);
1621  sllist_append(&write_queue, &impl->write_queue);
1622  sllist_append(&confirm_queue, &impl->confirm_queue);
1623 
1624  // Mark the ongoing write operation as canceled, if necessary.
1625  impl->current_write = NULL;
1626 
1627 #if !LELY_NO_THREADS
1628  pthread_mutex_unlock(&impl->mtx);
1629 #endif
1630 
1631  io_can_chan_read_queue_post(&read_queue, -1, ECANCELED);
1632  io_can_chan_write_queue_post(&write_queue, ECANCELED);
1633  io_can_chan_write_queue_post(&confirm_queue, ECANCELED);
1634 
1635  return fd;
1636 }
1637 
1638 #endif // !LELY_NO_STDIO && __linux__
@ CAN_FLAG_FDF
The FD Format (FDF) flag, formerly known as Extended Data Length (EDL).
Definition: msg.h:54
@ CAN_FLAG_BRS
The Bit Rate Switch (BRS) flag (only available in CAN FD format frames).
Definition: msg.h:62
This is the internal header file of the SocketCAN rtnetlink attributes functions.
void io_can_chan_destroy(io_can_chan_t *chan)
Destroys a CAN channel.
Definition: can_chan.c:409
int io_can_chan_get_handle(const io_can_chan_t *chan)
Returns the SocketCAN file descriptor associated with a CAN channel, or -1 if the channel is closed.
Definition: can_chan.c:418
int io_can_chan_close(io_can_chan_t *chan)
Closes the SocketCAN file descriptor associated with a CAN channel.
Definition: can_chan.c:575
#define LELY_IO_CAN_RXLEN
The default SocketCAN receive queue length (in number of CAN frames).
Definition: can_chan.c:57
int io_can_chan_open(io_can_chan_t *chan, const io_can_ctrl_t *ctrl, int flags)
Opens a CAN channel.
Definition: can_chan.c:433
int io_can_chan_release(io_can_chan_t *chan)
Dissociates and returns the SocketCAN file descriptor from a CAN channel.
Definition: can_chan.c:561
int io_can_chan_assign(io_can_chan_t *chan, int fd)
Assigns an existing SocketCAN file descriptor to a CAN channel.
Definition: can_chan.c:503
io_can_chan_t * io_can_chan_create(io_poll_t *poll, ev_exec_t *exec, size_t rxlen)
Creates a new CAN channel.
Definition: can_chan.c:382
int io_can_chan_is_open(const io_can_chan_t *chan)
Returns 1 if the CAN channel is open and 0 if not.
Definition: can_chan.c:569
This is the internal header file of the SocketCAN error frame conversion functions.
This is the internal header file of the SocketCAN CAN frame conversion functions.
This header file is part of the I/O library; it contains the I/O context and service declarations.
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:126
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:141
This header file is part of the utilities library; it contains the diagnostic declarations.
@ DIAG_WARNING
A warning.
Definition: diag.h:55
void diag(enum diag_severity severity, int errc, const char *format,...)
Emits a diagnostic message.
Definition: diag.c:171
@ IO_EVENT_IN
Data (other than priority data) MAY be read without blocking.
Definition: event.h:35
@ IO_EVENT_OUT
Data (bot normal and priority data) MAY be written without blocking.
Definition: event.h:46
@ IO_EVENT_ERR
An error has occurred. This event is always reported.
Definition: event.h:48
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:136
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:124
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:106
ssize_t io_fd_sendmsg(int fd, const struct msghdr *msg, int flags, int timeout)
Equivalent to POSIX sendmsg(fd, msg, flags | MSG_NOSIGNAL), except that if fd is non-blocking (or the...
Definition: fd.c:116
ssize_t io_fd_recvmsg(int fd, struct msghdr *msg, int flags, int timeout)
Equivalent to POSIX recvmsg(fd, msg, flags), except that if fd is non-blocking (or the implementation...
Definition: fd.c:80
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
struct io_can_chan_write * io_can_chan_write_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel write operation from a pointer to its completion task.
Definition: can.c:99
const struct io_can_chan_vtbl *const io_can_chan_t
An abstract CAN channel.
Definition: can.h:59
struct io_can_chan_read * io_can_chan_read_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel read operation from a pointer to its completion task.
Definition: can.c:93
const struct io_can_ctrl_vtbl *const io_can_ctrl_t
An abstract CAN controller.
Definition: can.h:56
@ IO_CAN_BUS_FLAG_BRS
Bit Rate Switch support is enabled.
Definition: can.h:44
@ IO_CAN_BUS_FLAG_FDF
FD Format (formerly Extended Data Length) support is enabled.
Definition: can.h:42
@ IO_CAN_BUS_FLAG_ERR
Reception of error frames is enabled.
Definition: can.h:39
#define LELY_IO_RX_TIMEOUT
The default timeout (in milliseconds) for I/O read operations.
Definition: io2.h:31
#define LELY_IO_TX_TIMEOUT
The default timeout (in milliseconds) for I/O write operations.
Definition: io2.h:36
This header file is part of the I/O library; it contains the CAN bus declarations for Linux.
unsigned int io_can_ctrl_get_index(const io_can_ctrl_t *ctrl)
Returns the interface index of a CAN controller.
Definition: can_ctrl.c:191
int io_can_ctrl_get_flags(const io_can_ctrl_t *ctrl)
Returns the flags specifying which CAN bus features are enabled.
Definition: can_ctrl.c:199
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
int can_msg_cmp(const void *p1, const void *p2)
Compares two CAN or CAN FD format frames.
Definition: msg.c:30
const struct io_dev_vtbl *const io_dev_t
An abstract I/O device.
Definition: dev.h:35
This header file is part of the I/O library; it contains the I/O polling declarations for POSIX platf...
io_ctx_t * io_poll_get_ctx(const io_poll_t *poll)
Returns a pointer to the I/O context with which the I/O polling instance is registered.
Definition: poll.c:275
#define IO_POLL_WATCH_INIT(func)
The static initializer for io_poll_watch.
Definition: poll.h:65
int io_poll_watch(io_poll_t *poll, io_handle_t handle, struct io_event *event, int keep)
Registers an I/O device with an I/O polling interface and instructs it to watch for certain events.
Definition: poll.c:252
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:194
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:257
void sllist_push_front(struct sllist *list, struct slnode *node)
Pushes a node to the front of a singly-linked list.
Definition: sllist.h:221
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:232
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:202
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:243
struct slnode * sllist_first(const struct sllist *list)
Returns a pointer to the first node in a singly-linked list.
Definition: sllist.h:271
int can_msg2canfd_frame(const struct can_msg *src, struct canfd_frame *dst)
Converts a can_msg frame to a SocketCAN CAN FD frame.
Definition: can_msg.h:141
int can_frame2can_msg(const struct can_frame *src, struct can_msg *dst)
Converts a SocketCAN CAN frame to a can_msg frame.
Definition: can_msg.h:51
int canfd_frame2can_msg(const struct canfd_frame *src, struct can_msg *dst)
Converts a SocketCAN CAN FD frame to a can_msg frame.
Definition: can_msg.h:112
int can_msg2can_frame(const struct can_msg *src, struct can_frame *dst)
Converts a can_msg frame to a SocketCAN CAN frame.
Definition: can_msg.h:80
This header file is part of the utilities library; it contains the single-producer,...
size_t spscring_c_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:262
int spscring_c_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:327
size_t spscring_p_capacity(struct spscring *ring)
Returns the total capacity available for a producer in a single-producer single-consumer ring buffer,...
Definition: spscring.c:65
size_t spscring_p_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a consumer and, if this satisfies a wait operation...
Definition: spscring.c:138
size_t spscring_c_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a producer and, if this satisfies a wait operation...
Definition: spscring.c:302
int spscring_c_abort_wait(struct spscring *ring)
Aborts a wait operation previously registered with spscring_c_submit_wait().
Definition: spscring.c:368
void spscring_init(struct spscring *ring, size_t size)
Initializes a single-producer, single-consumer ring buffer with the specified size.
Definition: spscring.c:47
size_t spscring_p_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:98
This is the internal header file of the Windows-specific I/O declarations.
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
An I/O polling interface.
Definition: poll.c:51
A CAN error frame.
Definition: err.h:28
A CAN or CAN FD format frame.
Definition: msg.h:87
uint_least8_t flags
The flags (any combination of CAN_FLAG_IDE, CAN_FLAG_RTR, CAN_FLAG_FDF, CAN_FLAG_BRS and CAN_FLAG_ESI...
Definition: msg.h:94
An executable task.
Definition: task.h:41
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
The implementation of a CAN channel.
Definition: can_chan.c:133
struct ev_task read_task
The task responsible for initiating read operations.
Definition: can_chan.c:155
int fd
The SocketCAN file descriptor.
Definition: can_chan.c:174
struct ev_task write_task
The task responsible for initiating write operations.
Definition: can_chan.c:157
struct io_can_frame * rxbuf
The receive queue.
Definition: can_chan.c:165
struct io_svc svc
The I/O service representing the channel.
Definition: can_chan.c:145
struct sllist read_queue
The queue containing pending read operations.
Definition: can_chan.c:188
struct ev_task rxbuf_task
The task responsible for filling the receive queue.
Definition: can_chan.c:153
io_poll_t * poll
A pointer to the polling instance used to watch for I/O events.
Definition: can_chan.c:143
unsigned write_posted
A flag indicating whether write_task has been posted to exec.
Definition: can_chan.c:186
int flags
The flags with which fd has been opened.
Definition: can_chan.c:176
unsigned read_posted
A flag indicating whether read_task has been posted to exec.
Definition: can_chan.c:184
pthread_mutex_t mtx
The mutex protecting the file descriptor, the flags and the queues of pending operations.
Definition: can_chan.c:171
struct sllist write_queue
The queue containing pending write operations.
Definition: can_chan.c:190
unsigned shutdown
A flag indicating whether the I/O service has been shut down.
Definition: can_chan.c:180
ev_exec_t * exec
A pointer to the executor used to execute all I/O tasks.
Definition: can_chan.c:149
const struct io_dev_vtbl * dev_vptr
A pointer to the virtual table for the I/O device interface.
Definition: can_chan.c:135
struct sllist confirm_queue
The queue containing write operations waiting to be confirmed.
Definition: can_chan.c:192
struct ev_task * current_write
The write operation currently being executed.
Definition: can_chan.c:194
pthread_mutex_t c_mtx
The mutex protecting the receive queue consumer.
Definition: can_chan.c:160
struct io_poll_watch watch
The object used to monitor the file descriptor for I/O events.
Definition: can_chan.c:151
const struct io_can_chan_vtbl * chan_vptr
A pointer to the virtual table for the CAN channel interface.
Definition: can_chan.c:137
io_ctx_t * ctx
A pointer to the I/O context with which the channel is registered.
Definition: can_chan.c:147
int events
The I/O events currently being monitored by poll for fd.
Definition: can_chan.c:178
struct spscring rxring
The ring buffer used to control the receive queue.
Definition: can_chan.c:163
unsigned rxbuf_posted
A flag indicating whether rxbuf_task has been posted to exec.
Definition: can_chan.c:182
int result
The result of the read operation: 1 if a CAN frame is received, 0 if an error frame is received,...
Definition: can.h:68
int errc
The error number, obtained as if by get_errc(), if result is -1.
Definition: can.h:70
A CAN channel read operation.
Definition: can.h:74
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition: can.h:98
struct io_can_chan_read_result r
The result of the read operation.
Definition: can.h:100
struct timespec * tp
The address at which to store the system time at which the CAN frame or CAN error frame was received.
Definition: can.h:93
struct can_msg * msg
The address at which to store the CAN frame.
Definition: can.h:80
struct can_err * err
The address at which to store the CAN error frame.
Definition: can.h:86
A CAN channel write operation.
Definition: can.h:110
const struct can_msg * msg
A pointer to the CAN frame to be written.
Definition: can.h:116
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the write operation.
Definition: can.h:121
int errc
The error number, obtained as if by get_errc(), if an error occurred or the operation was canceled.
Definition: can.h:126
Definition: ctx.c:38
An object representing a file descriptor being monitored for I/O events.
Definition: poll.h:56
The virtual table of an I/O service.
Definition: ctx.h:67
An I/O service.
Definition: ctx.h:49
A singly-linked list.
Definition: sllist.h:52
A node in a singly-linked list.
Definition: sllist.h:40
struct slnode * next
A pointer to the next node in the list.
Definition: sllist.h:42
A single-producer, single-consumer ring buffer.
Definition: spscring.h:63
size_t ev_task_queue_abort(struct sllist *queue)
Aborts the tasks in queue by invoking ev_exec_on_task_fini() for each of them.
Definition: task.c:55
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
size_t ev_task_queue_post(struct sllist *queue)
Post the tasks in queue to their respective executors and invokes ev_exec_on_task_fini() for each of ...
Definition: task.c:38
This header file is part of the C11 and POSIX compatibility library; it includes <unistd....
This header file is part of the utilities library; it contains the time function declarations.