Lely core libraries  2.2.5
can_chan.c
Go to the documentation of this file.
1 
24 #include "io.h"
25 
26 #ifdef __linux__
27 
28 #include "../can.h"
29 #include <lely/io2/ctx.h>
30 #include <lely/io2/linux/can.h>
31 #include <lely/io2/posix/poll.h>
32 #include <lely/util/diag.h>
33 #include <lely/util/spscring.h>
34 #include <lely/util/time.h>
35 #include <lely/util/util.h>
36 
37 #include <assert.h>
38 #include <errno.h>
39 #include <stdlib.h>
40 
41 #if !LELY_NO_THREADS
42 #include <pthread.h>
43 #endif
44 #include <unistd.h>
45 
46 #include <linux/can/raw.h>
47 #include <linux/sockios.h>
48 #include <sys/ioctl.h>
49 
50 #include "../posix/fd.h"
51 #include "can_attr.h"
52 #include "can_err.h"
53 #include "can_msg.h"
54 
55 #ifndef LELY_IO_CAN_RXLEN
56 #define LELY_IO_CAN_RXLEN 1024
58 #endif
59 
60 struct io_can_frame {
61 #if LELY_NO_CANFD
62  struct can_frame frame;
63 #else
64  struct canfd_frame frame;
65 #endif
66  size_t nbytes;
67  struct timespec ts;
68 };
69 
70 static int io_can_fd_set_default(int fd);
71 #if LELY_NO_CANFD
72 static int io_can_fd_read(int fd, struct can_frame *frame, size_t *pnbytes,
73  int *pflags, struct timespec *tp, int timeout);
74 #else
75 static int io_can_fd_read(int fd, struct canfd_frame *frame, size_t *pnbytes,
76  int *pflags, struct timespec *tp, int timeout);
77 #endif
78 #if LELY_NO_CANFD
79 static int io_can_fd_write(int fd, const struct can_frame *frame, size_t nbytes,
80  int dontwait);
81 #else
82 static int io_can_fd_write(int fd, const struct canfd_frame *frame,
83  size_t nbytes, int timeout);
84 #endif
85 static int io_can_fd_write_msg(int fd, const struct can_msg *msg, int timeout);
86 
87 static io_ctx_t *io_can_chan_impl_dev_get_ctx(const io_dev_t *dev);
88 static ev_exec_t *io_can_chan_impl_dev_get_exec(const io_dev_t *dev);
89 static size_t io_can_chan_impl_dev_cancel(io_dev_t *dev, struct ev_task *task);
90 static size_t io_can_chan_impl_dev_abort(io_dev_t *dev, struct ev_task *task);
91 
92 // clang-format off
93 static const struct io_dev_vtbl io_can_chan_impl_dev_vtbl = {
94  &io_can_chan_impl_dev_get_ctx,
95  &io_can_chan_impl_dev_get_exec,
96  &io_can_chan_impl_dev_cancel,
97  &io_can_chan_impl_dev_abort
98 };
99 // clang-format on
100 
101 static io_dev_t *io_can_chan_impl_get_dev(const io_can_chan_t *chan);
102 static int io_can_chan_impl_get_flags(const io_can_chan_t *chan);
103 static int io_can_chan_impl_read(io_can_chan_t *chan, struct can_msg *msg,
104  struct can_err *err, struct timespec *tp, int timeout);
105 static void io_can_chan_impl_submit_read(
106  io_can_chan_t *chan, struct io_can_chan_read *read);
107 static int io_can_chan_impl_write(
108  io_can_chan_t *chan, const struct can_msg *msg, int timeout);
109 static void io_can_chan_impl_submit_write(
110  io_can_chan_t *chan, struct io_can_chan_write *write);
111 
112 // clang-format off
113 static const struct io_can_chan_vtbl io_can_chan_impl_vtbl = {
114  &io_can_chan_impl_get_dev,
115  &io_can_chan_impl_get_flags,
116  &io_can_chan_impl_read,
117  &io_can_chan_impl_submit_read,
118  &io_can_chan_impl_write,
119  &io_can_chan_impl_submit_write
120 };
121 // clang-format on
122 
123 static void io_can_chan_impl_svc_shutdown(struct io_svc *svc);
124 
125 // clang-format off
126 static const struct io_svc_vtbl io_can_chan_impl_svc_vtbl = {
127  NULL,
128  &io_can_chan_impl_svc_shutdown
129 };
130 // clang-format on
131 
135  const struct io_dev_vtbl *dev_vptr;
145  struct io_svc svc;
151  struct io_poll_watch watch;
153  struct ev_task rxbuf_task;
155  struct ev_task read_task;
157  struct ev_task write_task;
158 #if !LELY_NO_THREADS
159  pthread_mutex_t c_mtx;
161 #endif
162  struct spscring rxring;
166 #if !LELY_NO_THREADS
167 
171  pthread_mutex_t mtx;
172 #endif
173  int fd;
176  int flags;
178  int events;
180  unsigned shutdown : 1;
182  unsigned rxbuf_posted : 1;
184  unsigned read_posted : 1;
186  unsigned write_posted : 1;
188  struct sllist read_queue;
190  struct sllist write_queue;
192  struct sllist confirm_queue;
195 };
196 
197 static void io_can_chan_impl_watch_func(
198  struct io_poll_watch *watch, int events);
199 static void io_can_chan_impl_rxbuf_task_func(struct ev_task *task);
200 static void io_can_chan_impl_read_task_func(struct ev_task *task);
201 static void io_can_chan_impl_write_task_func(struct ev_task *task);
202 
203 static inline struct io_can_chan_impl *io_can_chan_impl_from_dev(
204  const io_dev_t *dev);
205 static inline struct io_can_chan_impl *io_can_chan_impl_from_chan(
206  const io_can_chan_t *chan);
207 static inline struct io_can_chan_impl *io_can_chan_impl_from_svc(
208  const struct io_svc *svc);
209 
210 static void io_can_chan_impl_c_signal(struct spscring *ring, void *arg);
211 
212 static void io_can_chan_impl_do_pop(struct io_can_chan_impl *impl,
213  struct sllist *read_queue, struct sllist *write_queue,
214  struct sllist *confirm_queue, struct ev_task *task);
215 
216 static void io_can_chan_impl_do_read(struct io_can_chan_impl *impl,
217  struct sllist *queue, int *pwouldblock);
218 static void io_can_chan_impl_do_confirm(struct io_can_chan_impl *impl,
219  struct sllist *queue, const struct can_msg *msg);
220 
221 static size_t io_can_chan_impl_do_abort_tasks(struct io_can_chan_impl *impl);
222 
223 static int io_can_chan_impl_set_fd(
224  struct io_can_chan_impl *impl, int fd, int flags);
225 
226 void *
227 io_can_chan_alloc(void)
228 {
229  struct io_can_chan_impl *impl = malloc(sizeof(*impl));
230  // cppcheck-suppress memleak symbolName=impl
231  return impl ? &impl->chan_vptr : NULL;
232 }
233 
234 void
235 io_can_chan_free(void *ptr)
236 {
237  if (ptr)
238  free(io_can_chan_impl_from_chan(ptr));
239 }
240 
242 io_can_chan_init(io_can_chan_t *chan, io_poll_t *poll, ev_exec_t *exec,
243  size_t rxlen)
244 {
245  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
246  io_ctx_t *ctx = poll ? io_poll_get_ctx(poll) : NULL;
247 
248  if (!rxlen)
249  rxlen = LELY_IO_CAN_RXLEN;
250 
251  int errsv = 0;
252 
253  impl->dev_vptr = &io_can_chan_impl_dev_vtbl;
254  impl->chan_vptr = &io_can_chan_impl_vtbl;
255 
256  impl->poll = poll;
257 
258  impl->svc = (struct io_svc)IO_SVC_INIT(&io_can_chan_impl_svc_vtbl);
259  impl->ctx = ctx;
260 
261  impl->exec = exec;
262 
263  impl->watch = (struct io_poll_watch)IO_POLL_WATCH_INIT(
264  &io_can_chan_impl_watch_func);
265 
266  impl->rxbuf_task = (struct ev_task)EV_TASK_INIT(
267  impl->exec, &io_can_chan_impl_rxbuf_task_func);
268  impl->read_task = (struct ev_task)EV_TASK_INIT(
269  impl->exec, &io_can_chan_impl_read_task_func);
270  impl->write_task = (struct ev_task)EV_TASK_INIT(
271  impl->exec, &io_can_chan_impl_write_task_func);
272 
273 #if !LELY_NO_THREADS
274  if ((errsv = pthread_mutex_init(&impl->c_mtx, NULL)))
275  goto error_init_c_mtx;
276 #endif
277 
278  spscring_init(&impl->rxring, rxlen);
279  impl->rxbuf = calloc(rxlen, sizeof(struct io_can_frame));
280  if (!impl->rxbuf) {
281  errsv = errno;
282  goto error_alloc_rxbuf;
283  }
284 
285 #if !LELY_NO_THREADS
286  if ((errsv = pthread_mutex_init(&impl->mtx, NULL)))
287  goto error_init_mtx;
288 #endif
289 
290  impl->fd = -1;
291  impl->flags = 0;
292  impl->events = 0;
293 
294  impl->shutdown = 0;
295  impl->rxbuf_posted = 0;
296  impl->read_posted = 0;
297  impl->write_posted = 0;
298 
299  sllist_init(&impl->read_queue);
300  sllist_init(&impl->write_queue);
301  sllist_init(&impl->confirm_queue);
302  impl->current_write = NULL;
303 
304  if (impl->ctx)
305  io_ctx_insert(impl->ctx, &impl->svc);
306 
307  return chan;
308 
309 #if !LELY_NO_THREADS
310  // pthread_mutex_destroy(&impl->mtx);
311 error_init_mtx:
312 #endif
313  free(impl->rxbuf);
314 error_alloc_rxbuf:
315 #if !LELY_NO_THREADS
316  pthread_mutex_destroy(&impl->c_mtx);
317 error_init_c_mtx:
318 #endif
319  errno = errsv;
320  return NULL;
321 }
322 
323 void
324 io_can_chan_fini(io_can_chan_t *chan)
325 {
326  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
327 
328  if (impl->ctx)
329  io_ctx_remove(impl->ctx, &impl->svc);
330  // Cancel all pending operations.
331  io_can_chan_impl_svc_shutdown(&impl->svc);
332 
333 #if !LELY_NO_THREADS
334  // Abort any consumer wait operation running in a task.
335  pthread_mutex_lock(&impl->c_mtx);
337  pthread_mutex_unlock(&impl->c_mtx);
338 
339  int warning = 0;
340  pthread_mutex_lock(&impl->mtx);
341  // If necessary, busy-wait until io_can_chan_impl_rxbuf_task_func(),
342  // io_can_chan_impl_read_task_func() and
343  // io_can_chan_impl_write_task_func() complete.
344  while (impl->rxbuf_posted || impl->read_posted || impl->write_posted) {
345  if (io_can_chan_impl_do_abort_tasks(impl))
346  continue;
347  pthread_mutex_unlock(&impl->mtx);
348  if (!warning) {
349  warning = 1;
350  diag(DIAG_WARNING, 0,
351  "io_can_chan_fini() invoked with pending operations");
352  }
353  sched_yield();
354  pthread_mutex_lock(&impl->mtx);
355  }
356  pthread_mutex_unlock(&impl->mtx);
357 #endif
358 
359  // Close the socket.
360  if (impl->fd != -1) {
361  if (impl->events)
362  io_poll_watch(impl->poll, impl->fd, 0, &impl->watch);
363  close(impl->fd);
364  }
365 
366 #if !LELY_NO_THREADS
367  pthread_mutex_destroy(&impl->mtx);
368 #endif
369 
370  free(impl->rxbuf);
371 
372 #if !LELY_NO_THREADS
373  pthread_mutex_destroy(&impl->c_mtx);
374 #endif
375 }
376 
378 io_can_chan_create(io_poll_t *poll, ev_exec_t *exec, size_t rxlen)
379 {
380  int errsv = 0;
381 
382  io_can_chan_t *chan = io_can_chan_alloc();
383  if (!chan) {
384  errsv = errno;
385  goto error_alloc;
386  }
387 
388  io_can_chan_t *tmp = io_can_chan_init(chan, poll, exec, rxlen);
389  if (!tmp) {
390  errsv = errno;
391  goto error_init;
392  }
393  chan = tmp;
394 
395  return chan;
396 
397 error_init:
398  io_can_chan_free((void *)chan);
399 error_alloc:
400  errno = errsv;
401  return NULL;
402 }
403 
404 void
406 {
407  if (chan) {
408  io_can_chan_fini(chan);
409  io_can_chan_free((void *)chan);
410  }
411 }
412 
413 int
415 {
416  const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
417 
418 #if !LELY_NO_THREADS
419  pthread_mutex_lock((pthread_mutex_t *)&impl->mtx);
420 #endif
421  int fd = impl->fd;
422 #if !LELY_NO_THREADS
423  pthread_mutex_unlock((pthread_mutex_t *)&impl->mtx);
424 #endif
425  return fd;
426 }
427 
428 int
430 {
431  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
432 
433  if (flags & ~(io_can_ctrl_get_flags(ctrl) | IO_CAN_BUS_FLAG_ERR)) {
434  errno = EINVAL;
435  return -1;
436  }
437 
438  int errsv = 0;
439 
440  int fd = socket(AF_CAN, SOCK_RAW | SOCK_CLOEXEC, CAN_RAW);
441  if (fd == -1) {
442  errsv = errno;
443  goto error_socket;
444  }
445 
446  struct sockaddr_can addr = { .can_family = AF_CAN,
447  .can_ifindex = io_can_ctrl_get_index(ctrl) };
448 
449  if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
450  errsv = errno;
451  goto error_bind;
452  }
453 
454  if (flags & IO_CAN_BUS_FLAG_ERR) {
455  can_err_mask_t optval = CAN_ERR_MASK;
456  // clang-format off
457  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_ERR_FILTER, &optval,
458  sizeof(optval)) == -1) {
459  // clang-format on
460  errsv = errno;
461  goto error_setsockopt;
462  }
463  }
464 
465 #if !LELY_NO_CANFD
466  if (flags & IO_CAN_BUS_FLAG_FDF) {
467  int optval = 1;
468  // clang-format off
469  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_FD_FRAMES, &optval,
470  sizeof(optval)) == -1) {
471  // clang-format on
472  errsv = errno;
473  goto error_setsockopt;
474  }
475  }
476 #endif
477 
478  if (io_can_fd_set_default(fd) == -1) {
479  errsv = errno;
480  goto error_set_default;
481  }
482 
483  fd = io_can_chan_impl_set_fd(impl, fd, flags);
484  if (fd != -1)
485  close(fd);
486 
487  return 0;
488 
489 error_set_default:
490 error_setsockopt:
491 error_bind:
492  close(fd);
493 error_socket:
494  errno = errsv;
495  return -1;
496 }
497 
498 int
500 {
501  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
502 
503  struct sockaddr_can addr = { .can_family = AF_UNSPEC };
504  socklen_t addrlen = sizeof(addr);
505  if (getsockname(fd, (struct sockaddr *)&addr, &addrlen) == -1)
506  return -1;
507  if (addrlen < sizeof(addr) || addr.can_family != AF_CAN) {
508  errno = ENODEV;
509  return -1;
510  }
511  unsigned int ifindex = addr.can_ifindex;
512 
513  struct io_can_attr attr = IO_CAN_ATTR_INIT;
514  if (io_can_attr_get(&attr, ifindex) == -1)
515  return -1;
516  int flags = attr.flags;
517 
518  {
519  can_err_mask_t optval = 0;
520  socklen_t optlen = sizeof(optval);
521  // clang-format off
522  if (getsockopt(fd, SOL_CAN_RAW, CAN_RAW_ERR_FILTER, &optval,
523  &optlen) == -1)
524  // clang-format on
525  return -1;
526  if (optval & CAN_ERR_MASK)
527  flags |= IO_CAN_BUS_FLAG_ERR;
528  }
529 
530 #if !LELY_NO_CANFD
531  // Check if CAN FD frames are allowed. If the check fails, we assume
532  // they are not.
533  {
534  int errsv = errno;
535  int optval = 0;
536  socklen_t optlen = sizeof(optval);
537  // clang-format off
538  if (!getsockopt(fd, SOL_CAN_RAW, CAN_RAW_FD_FRAMES, &optval,
539  &optlen) && !optval)
540  // clang-format on
542  errno = errsv;
543  }
544 #endif
545 
546  if (io_can_fd_set_default(fd) == -1)
547  return -1;
548 
549  fd = io_can_chan_impl_set_fd(impl, fd, flags);
550  if (fd != -1)
551  close(fd);
552 
553  return 0;
554 }
555 
556 int
558 {
559  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
560 
561  return io_can_chan_impl_set_fd(impl, -1, 0);
562 }
563 
564 int
566 {
567  return io_can_chan_get_handle(chan) != -1;
568 }
569 
570 int
572 {
573  int fd = io_can_chan_release(chan);
574  return fd != -1 ? close(fd) : 0;
575 }
576 
577 static int
578 io_can_fd_set_default(int fd)
579 {
580  int optval;
581 
582  // Enable local loopback.
583  optval = 1;
584  // clang-format off
585  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_LOOPBACK, &optval,
586  sizeof(optval)) == -1)
587  // clang-format on
588  return -1;
589 
590  // Enable the reception of CAN frames sent by this socket so we can
591  // check for successful transmission.
592  optval = 1;
593  // clang-format off
594  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_RECV_OWN_MSGS, &optval,
595  sizeof(optval)) == -1)
596  // clang-format on
597  return -1;
598 
599  // Set the size of the send buffer to its minimum value. This causes
600  // write operations to block (or return EAGAIN) instead of returning
601  // ENOBUFS.
602  optval = 0;
603  // clang-format off
604  if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval))
605  == -1)
606  // clang-format on
607  return -1;
608 
609  return 0;
610 }
611 
612 static int
613 #if LELY_NO_CANFD
614 io_can_fd_read(int fd, struct can_frame *frame, size_t *pnbytes, int *pflags,
615  struct timespec *tp, int timeout)
616 #else
617 io_can_fd_read(int fd, struct canfd_frame *frame, size_t *pnbytes, int *pflags,
618  struct timespec *tp, int timeout)
619 #endif
620 {
621  struct iovec iov = { .iov_base = (void *)frame,
622  .iov_len = sizeof(*frame) };
623  struct msghdr msg = { .msg_iov = &iov, .msg_iovlen = 1 };
624 
625  ssize_t result;
626  for (;;) {
627  result = io_fd_recvmsg(fd, &msg, 0, timeout);
628  if (result < 0)
629  return result;
630 #if LELY_NO_CANFD
631  if (result == CAN_MTU)
632 #else
633  if (result == CAN_MTU || result == CANFD_MTU)
634 #endif
635  break;
636  if (timeout > 0)
637  timeout = 0;
638  }
639 
640  if (pnbytes)
641  *pnbytes = result;
642 
643  if (pflags)
644  *pflags = msg.msg_flags;
645 
646  if (tp) {
647  if (msg.msg_flags & MSG_CONFIRM) {
648  // Ignore the timestamp for write confirmations.
649  *tp = (struct timespec){ 0, 0 };
650  } else {
651  struct timeval tv = { 0, 0 };
652  if (ioctl(fd, SIOCGSTAMP, &tv) == -1)
653  return -1;
654  tp->tv_sec = tv.tv_sec;
655  tp->tv_nsec = tv.tv_usec * 1000;
656  }
657  }
658 
659  return 0;
660 }
661 
662 static int
663 #if LELY_NO_CANFD
664 io_can_fd_write(int fd, const struct can_frame *frame, size_t nbytes,
665  int timeout)
666 #else
667 io_can_fd_write(int fd, const struct canfd_frame *frame, size_t nbytes,
668  int timeout)
669 #endif
670 {
671  struct iovec iov = { .iov_base = (void *)frame, .iov_len = nbytes };
672  struct msghdr msg = { .msg_iov = &iov, .msg_iovlen = 1 };
673 
674  return io_fd_sendmsg(fd, &msg, 0, timeout) > 0 ? 0 : -1;
675 }
676 
677 static int
678 io_can_fd_write_msg(int fd, const struct can_msg *msg, int timeout)
679 {
680  assert(msg);
681 
682  // Convert the frame to the SocketCAN format.
683  struct io_can_frame frame;
684 #if !LELY_NO_CANFD
685  if (msg->flags & CAN_FLAG_FDF) {
686  if (can_msg2canfd_frame(msg, &frame.frame) == -1) {
687  errno = EINVAL;
688  return -1;
689  }
690  frame.nbytes = CANFD_MTU;
691  } else {
692 #endif
693  if (can_msg2can_frame(msg, (struct can_frame *)&frame.frame)
694  == -1) {
695  errno = EINVAL;
696  return -1;
697  }
698  frame.nbytes = CAN_MTU;
699 #if !LELY_NO_CANFD
700  }
701 #endif
702 
703  return io_can_fd_write(fd, &frame.frame, frame.nbytes, timeout);
704 }
705 
706 static io_ctx_t *
707 io_can_chan_impl_dev_get_ctx(const io_dev_t *dev)
708 {
709  const struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
710 
711  return impl->ctx;
712 }
713 
714 static ev_exec_t *
715 io_can_chan_impl_dev_get_exec(const io_dev_t *dev)
716 {
717  const struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
718 
719  return impl->exec;
720 }
721 
722 static size_t
723 io_can_chan_impl_dev_cancel(io_dev_t *dev, struct ev_task *task)
724 {
725  struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
726 
727  size_t n = 0;
728 
729  struct sllist read_queue, write_queue, confirm_queue;
730  sllist_init(&read_queue);
731  sllist_init(&write_queue);
732  sllist_init(&confirm_queue);
733 
734 #if !LELY_NO_THREADS
735  pthread_mutex_lock(&impl->mtx);
736 #endif
737  io_can_chan_impl_do_pop(
738  impl, &read_queue, &write_queue, &confirm_queue, task);
739  // Mark the ongoing write operation as canceled, if necessary.
740  if (impl->current_write && (!task || task == impl->current_write)) {
741  impl->current_write = NULL;
742  n++;
743  }
744 #if !LELY_NO_THREADS
745  pthread_mutex_unlock(&impl->mtx);
746 #endif
747 
748  size_t nread = io_can_chan_read_queue_post(&read_queue, -1, ECANCELED);
749  n = n < SIZE_MAX - nread ? n + nread : SIZE_MAX;
750  size_t nwrite = io_can_chan_write_queue_post(&write_queue, ECANCELED);
751  n = n < SIZE_MAX - nwrite ? n + nwrite : SIZE_MAX;
752  size_t nconfirm =
753  io_can_chan_write_queue_post(&confirm_queue, ECANCELED);
754  n = n < SIZE_MAX - nconfirm ? n + nconfirm : SIZE_MAX;
755 
756  return n;
757 }
758 
759 static size_t
760 io_can_chan_impl_dev_abort(io_dev_t *dev, struct ev_task *task)
761 {
762  struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
763 
764  struct sllist queue;
765  sllist_init(&queue);
766 
767 #if !LELY_NO_THREADS
768  pthread_mutex_lock(&impl->mtx);
769 #endif
770  io_can_chan_impl_do_pop(impl, &queue, &queue, &queue, task);
771 #if !LELY_NO_THREADS
772  pthread_mutex_unlock(&impl->mtx);
773 #endif
774 
775  return ev_task_queue_abort(&queue);
776 }
777 static io_dev_t *
778 io_can_chan_impl_get_dev(const io_can_chan_t *chan)
779 {
780  const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
781 
782  return &impl->dev_vptr;
783 }
784 
785 static int
786 io_can_chan_impl_get_flags(const io_can_chan_t *chan)
787 {
788  const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
789 
790 #if !LELY_NO_THREADS
791  pthread_mutex_lock((pthread_mutex_t *)&impl->mtx);
792 #endif
793  int flags = impl->flags;
794 #if !LELY_NO_THREADS
795  pthread_mutex_unlock((pthread_mutex_t *)&impl->mtx);
796 #endif
797  return flags;
798 }
799 
800 static int
801 io_can_chan_impl_read(io_can_chan_t *chan, struct can_msg *msg,
802  struct can_err *err, struct timespec *tp, int timeout)
803 {
804  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
805 
806  struct io_can_frame frame_;
807  struct io_can_frame *frame = &frame_;
808 #if !LELY_NO_THREADS
809  pthread_mutex_lock(&impl->c_mtx);
810 #endif
811  size_t n = 0;
812  for (;;) {
813  // Check if a frame is available in the receive queue.
814  n = 1;
815  size_t i = spscring_c_alloc(&impl->rxring, &n);
816  if (n) {
817  frame = &impl->rxbuf[i];
818  break;
819  }
820 #if !LELY_NO_THREADS
821  pthread_mutex_unlock(&impl->c_mtx);
822  pthread_mutex_lock(&impl->mtx);
823 #endif
824  // If not, read a frame directly.
825  int fd = impl->fd;
826 #if !LELY_NO_THREADS
827  pthread_mutex_unlock(&impl->mtx);
828 #endif
829  int flags = 0;
830  // clang-format off
831  if (io_can_fd_read(fd, &frame->frame, &frame->nbytes, &flags,
832  &frame->ts, timeout) < 0)
833  // clang-format on
834  return -1;
835  // Process the frame unless it is a write confirmation.
836  if (!(flags & MSG_CONFIRM))
837  break;
838  // Convert the frame from the SocketCAN format.
839  void *src = &frame->frame;
840  struct can_msg msg;
841 #if !LELY_NO_CANFD
842  if (frame->nbytes == CANFD_MTU)
843  canfd_frame2can_msg(src, &msg);
844  else
845 #endif
846  can_frame2can_msg(src, &msg);
847  // Process the write confirmation.
848  struct sllist queue;
849  sllist_init(&queue);
850 #if !LELY_NO_THREADS
851  pthread_mutex_lock(&impl->mtx);
852 #endif
853  io_can_chan_impl_do_confirm(impl, &queue, &msg);
854 #if !LELY_NO_THREADS
855  pthread_mutex_unlock(&impl->mtx);
856 #endif
857  ev_task_queue_post(&queue);
858  // Since the timeout is relative, we can only use a positive
859  // value once.
860  if (timeout > 0)
861  timeout = 0;
862 #if !LELY_NO_THREADS
863  pthread_mutex_lock(&impl->c_mtx);
864 #endif
865  }
866  // Parse the frame.
867  void *data = &frame->frame;
868  int is_err = can_frame2can_err(data, err);
869  if (!is_err && msg) {
870 #if !LELY_NO_CANFD
871  if (frame->nbytes == CANFD_MTU)
872  canfd_frame2can_msg(data, msg);
873  else
874 #endif
875  can_frame2can_msg(data, msg);
876  }
877  if (tp)
878  *tp = frame->ts;
879  // Remove the frame from the receive queue, if necessary.
880  if (n) {
881  spscring_c_commit(&impl->rxring, n);
882 #if !LELY_NO_THREADS
883  pthread_mutex_unlock(&impl->c_mtx);
884 #endif
885  }
886 
887  return is_err == -1 ? -1 : !is_err;
888 }
889 
890 static void
891 io_can_chan_impl_submit_read(io_can_chan_t *chan, struct io_can_chan_read *read)
892 {
893  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
894  assert(read);
895  struct ev_task *task = &read->task;
896 
897  if (!task->exec)
898  task->exec = impl->exec;
899  assert(task->exec);
900  ev_exec_on_task_init(task->exec);
901 
902 #if !LELY_NO_THREADS
903  pthread_mutex_lock(&impl->mtx);
904 #endif
905  if (impl->shutdown) {
906 #if !LELY_NO_THREADS
907  pthread_mutex_unlock(&impl->mtx);
908 #endif
909  io_can_chan_read_post(read, -1, ECANCELED);
910  } else {
911  int post_read = !impl->read_posted
912  && sllist_empty(&impl->read_queue);
913  sllist_push_back(&impl->read_queue, &task->_node);
914  if (post_read)
915  impl->read_posted = 1;
916 #if !LELY_NO_THREADS
917  pthread_mutex_unlock(&impl->mtx);
918 #endif
919  assert(impl->read_task.exec);
920  if (post_read)
921  ev_exec_post(impl->read_task.exec, &impl->read_task);
922  }
923 }
924 
925 static int
926 io_can_chan_impl_write(
927  io_can_chan_t *chan, const struct can_msg *msg, int timeout)
928 {
929  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
930 
931 #if !LELY_NO_CANFD
932  int flags = 0;
933  if (msg->flags & CAN_FLAG_FDF)
934  flags |= IO_CAN_BUS_FLAG_FDF;
935  if (msg->flags & CAN_FLAG_BRS)
936  flags |= IO_CAN_BUS_FLAG_BRS;
937 #endif
938 
939 #if !LELY_NO_THREADS
940  pthread_mutex_lock(&impl->mtx);
941 #endif
942 #if !LELY_NO_CANFD
943  if ((flags & impl->flags) != flags) {
944 #if !LELY_NO_THREADS
945  pthread_mutex_unlock(&impl->mtx);
946 #endif
947  errno = EINVAL;
948  return -1;
949  }
950 #endif
951  int fd = impl->fd;
952 #if !LELY_NO_THREADS
953  pthread_mutex_unlock(&impl->mtx);
954 #endif
955 
956  return io_can_fd_write_msg(fd, msg, timeout);
957 }
958 
959 static void
960 io_can_chan_impl_submit_write(
961  io_can_chan_t *chan, struct io_can_chan_write *write)
962 {
963  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
964  assert(write);
965  assert(write->msg);
966  struct ev_task *task = &write->task;
967 
968 #if !LELY_NO_CANFD
969  int flags = 0;
970  if (write->msg->flags & CAN_FLAG_FDF)
971  flags |= IO_CAN_BUS_FLAG_FDF;
972  if (write->msg->flags & CAN_FLAG_BRS)
973  flags |= IO_CAN_BUS_FLAG_BRS;
974 #endif
975 
976  if (!task->exec)
977  task->exec = impl->exec;
978  assert(task->exec);
979  ev_exec_on_task_init(task->exec);
980 
981 #if !LELY_NO_THREADS
982  pthread_mutex_lock(&impl->mtx);
983 #endif
984  if (impl->shutdown) {
985 #if !LELY_NO_THREADS
986  pthread_mutex_unlock(&impl->mtx);
987 #endif
988  io_can_chan_write_post(write, ECANCELED);
989 #if !LELY_NO_CANFD
990  } else if ((flags & impl->flags) != flags) {
991 #if !LELY_NO_THREADS
992  pthread_mutex_unlock(&impl->mtx);
993 #endif
994  io_can_chan_write_post(write, EINVAL);
995 #endif
996  } else {
997  int post_write = !impl->write_posted
998  && sllist_empty(&impl->write_queue);
999  sllist_push_back(&impl->write_queue, &task->_node);
1000  if (post_write)
1001  impl->write_posted = 1;
1002 #if !LELY_NO_THREADS
1003  pthread_mutex_unlock(&impl->mtx);
1004 #endif
1005  assert(impl->write_task.exec);
1006  if (post_write)
1007  ev_exec_post(impl->write_task.exec, &impl->write_task);
1008  }
1009 }
1010 
1011 static void
1012 io_can_chan_impl_svc_shutdown(struct io_svc *svc)
1013 {
1014  struct io_can_chan_impl *impl = io_can_chan_impl_from_svc(svc);
1015  io_dev_t *dev = &impl->dev_vptr;
1016 
1017 #if !LELY_NO_THREADS
1018  pthread_mutex_lock(&impl->mtx);
1019 #endif
1020  int shutdown = !impl->shutdown;
1021  impl->shutdown = 1;
1022  if (shutdown) {
1023  if (impl->events) {
1024  impl->events = 0;
1025  // Stop monitoring I/O events.
1026  io_poll_watch(impl->poll, impl->fd, impl->events,
1027  &impl->watch);
1028  }
1029  // Try to abort io_can_chan_impl_rxbuf_task_func(),
1030  // io_can_chan_impl_read_task_func() and
1031  // io_can_chan_impl_write_task_func().
1032  io_can_chan_impl_do_abort_tasks(impl);
1033  }
1034 #if !LELY_NO_THREADS
1035  pthread_mutex_unlock(&impl->mtx);
1036 #endif
1037 
1038  if (shutdown)
1039  // Cancel all pending operations.
1040  io_can_chan_impl_dev_cancel(dev, NULL);
1041 }
1042 
1043 static void
1044 io_can_chan_impl_watch_func(struct io_poll_watch *watch, int events)
1045 {
1046  assert(watch);
1047  struct io_can_chan_impl *impl =
1048  structof(watch, struct io_can_chan_impl, watch);
1049 
1050 #if !LELY_NO_THREADS
1051  pthread_mutex_lock(&impl->mtx);
1052 #endif
1053  // Continue monitoring events that are not otherwise handled.
1054  if ((events & IO_EVENT_ERR) || impl->fd == -1 || impl->shutdown) {
1055  impl->events = 0;
1056  } else if ((impl->events &= ~events) != 0) {
1057  int errsv = errno;
1058  // clang-format off
1059  if (io_poll_watch(impl->poll, impl->fd, impl->events,
1060  &impl->watch) == -1) {
1061  // clang-format on
1062  impl->events = 0;
1063  events |= IO_EVENT_ERR;
1064  }
1065  errno = errsv;
1066  }
1067 
1068  // Process incoming CAN frames.
1069  int post_rxbuf = 0;
1070  if ((events & (IO_EVENT_IN | IO_EVENT_ERR)) && !impl->shutdown) {
1071  post_rxbuf = !impl->rxbuf_posted;
1072  impl->rxbuf_posted = 1;
1073  }
1074 
1075  // Retry any pending write operations.
1076  int post_write = 0;
1077  if ((events & (IO_EVENT_OUT | IO_EVENT_ERR))
1078  && !sllist_empty(&impl->write_queue)
1079  && !impl->shutdown) {
1080  post_write = !impl->write_posted;
1081  impl->write_posted = 1;
1082  }
1083 #if !LELY_NO_THREADS
1084  pthread_mutex_unlock(&impl->mtx);
1085 #endif
1086 
1087  if (post_rxbuf)
1088  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1089  if (post_write)
1090  ev_exec_post(impl->write_task.exec, &impl->write_task);
1091 }
1092 
1093 static void
1094 io_can_chan_impl_rxbuf_task_func(struct ev_task *task)
1095 {
1096  assert(task);
1097  struct io_can_chan_impl *impl =
1098  structof(task, struct io_can_chan_impl, rxbuf_task);
1099 
1100  int errsv = errno;
1101 
1102  struct sllist queue;
1103  sllist_init(&queue);
1104 
1105  int result = 0;
1106  int errc = 0;
1107  int wouldblock = 0;
1108 
1109 #if !LELY_NO_THREADS
1110  pthread_mutex_lock(&impl->mtx);
1111 #endif
1112  // Only try to fill the receive queue if there are pending read
1113  // operations and there is an empty slot in the queue, or if there are
1114  // pending write confirmations.
1115  // clang-format off
1116  while ((!sllist_empty(&impl->read_queue)
1117  && spscring_p_capacity(&impl->rxring))
1118  || !sllist_empty(&impl->confirm_queue)) {
1119  // clang-format on
1120  int fd = impl->fd;
1121 #if !LELY_NO_THREADS
1122  pthread_mutex_unlock(&impl->mtx);
1123 #endif
1124 
1125  struct io_can_frame frame_;
1126  struct io_can_frame *frame = &frame_;
1127  // Try to obtain an empty slot in the receive queue.
1128  size_t n = 1;
1129  size_t i = spscring_p_alloc(&impl->rxring, &n);
1130  if (n)
1131  frame = &impl->rxbuf[i];
1132 
1133  // Try to read a CAN or CAN FD format frame from the CAN bus.
1134  int flags = 0;
1135  result = io_can_fd_read(fd, &frame->frame, &frame->nbytes,
1136  &flags, &frame->ts,
1137  impl->poll ? 0 : LELY_IO_RX_TIMEOUT);
1138  errc = !result ? 0 : errno;
1139  wouldblock = errc == EAGAIN || errc == EWOULDBLOCK;
1140 
1141  // Convert the frame from the SocketCAN format if it is a write
1142  // confirmation.
1143  struct can_msg msg;
1144  if (!result && (flags & MSG_CONFIRM)) {
1145  void *src = &frame->frame;
1146 #if !LELY_NO_CANFD
1147  if (frame->nbytes == CANFD_MTU)
1148  canfd_frame2can_msg(src, &msg);
1149  else
1150 #endif
1151  can_frame2can_msg(src, &msg);
1152  }
1153 
1154  // Make the frame available for reading.
1155  if (!result && !(flags & MSG_CONFIRM))
1156  spscring_p_commit(&impl->rxring, n);
1157 
1158 #if !LELY_NO_THREADS
1159  pthread_mutex_lock(&impl->mtx);
1160 #endif
1161  // Process the write confirmation, if any.
1162  if (!result && flags & MSG_CONFIRM)
1163  io_can_chan_impl_do_confirm(impl, &queue, &msg);
1164 
1165  // Stop if the operation did or would block, or if an error
1166  // occurred.
1167  if (!impl->poll || result < 0)
1168  break;
1169  }
1170  // Cancel all pending read operations on error.
1171  if (result < 0 && !wouldblock) {
1172  io_can_chan_impl_do_read(impl, &queue, NULL);
1173  while ((task = ev_task_from_node(
1174  sllist_pop_front(&impl->read_queue)))) {
1175  struct io_can_chan_read *read =
1177  read->r.result = result;
1178  read->r.errc = errc;
1179  sllist_push_back(&queue, &task->_node);
1180  }
1181  }
1182  // clang-format off
1183  int post_rxbuf = !(sllist_empty(&impl->read_queue)
1184  && sllist_empty(&impl->confirm_queue))
1185  && impl->fd != -1 && !impl->shutdown;
1186  // clang-format on
1187  // If a read operation would block, start monitoring the file descriptor
1188  // for I/O events.
1189  if (post_rxbuf && impl->poll && wouldblock) {
1190  int events = impl->events | IO_EVENT_IN;
1191  // clang-format off
1192  if (!io_poll_watch(impl->poll, impl->fd, events,
1193  &impl->watch)) {
1194  // clang-format on
1195  impl->events = events;
1196  // Do not repost this thask unless registering the file
1197  // descriptor fails.
1198  post_rxbuf = 0;
1199  }
1200  }
1201  impl->rxbuf_posted = post_rxbuf;
1202 #if !LELY_NO_THREADS
1203  pthread_mutex_unlock(&impl->mtx);
1204 #endif
1205 
1206  ev_task_queue_post(&queue);
1207 
1208  if (post_rxbuf)
1209  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1210 
1211  errno = errsv;
1212 }
1213 
1214 static void
1215 io_can_chan_impl_read_task_func(struct ev_task *task)
1216 {
1217  assert(task);
1218  struct io_can_chan_impl *impl =
1219  structof(task, struct io_can_chan_impl, read_task);
1220 
1221  struct sllist queue;
1222  sllist_init(&queue);
1223 
1224 #if !LELY_NO_THREADS
1225  pthread_mutex_lock(&impl->mtx);
1226 #endif
1227  // Process any pending read operations that can be satisfied.
1228  int wouldblock = 0;
1229  io_can_chan_impl_do_read(impl, &queue, &wouldblock);
1230  int post_rxbuf = 0;
1231  // Repost this task if any read operations remain in the queue.
1232  int post_read = !sllist_empty(&impl->read_queue) && !impl->shutdown;
1233  // Register a wait operation if the receive queue is empty.
1234  if (post_read && wouldblock) {
1235 #if !LELY_NO_THREADS
1236  pthread_mutex_lock(&impl->c_mtx);
1237 #endif
1238  // Do not repost this task unless the wait condition can be
1239  // satisfied immediately.
1240  post_read = !spscring_c_submit_wait(&impl->rxring, 1,
1241  io_can_chan_impl_c_signal, impl);
1242 #if !LELY_NO_THREADS
1243  pthread_mutex_unlock(&impl->c_mtx);
1244 #endif
1245  if (!post_read)
1246  // If the receive queue is empty, start reading more CAN
1247  // frames, unless we're already waiting for one.
1248  post_rxbuf = !impl->rxbuf_posted
1249  && !(impl->events & IO_EVENT_IN)
1250  && impl->fd != -1 && !impl->shutdown;
1251  }
1252  if (post_rxbuf)
1253  impl->rxbuf_posted = 1;
1254  impl->read_posted = post_read;
1255 #if !LELY_NO_THREADS
1256  pthread_mutex_unlock(&impl->mtx);
1257 #endif
1258 
1259  ev_task_queue_post(&queue);
1260 
1261  if (post_rxbuf)
1262  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1263 
1264  if (post_read)
1265  ev_exec_post(impl->read_task.exec, &impl->read_task);
1266 }
1267 
1268 static void
1269 io_can_chan_impl_write_task_func(struct ev_task *task)
1270 {
1271  assert(task);
1272  struct io_can_chan_impl *impl =
1273  structof(task, struct io_can_chan_impl, write_task);
1274 
1275  int errsv = errno;
1276 
1277  int wouldblock = 0;
1278 
1279 #if !LELY_NO_THREADS
1280  pthread_mutex_lock(&impl->mtx);
1281 #endif
1282  // Try to process all pending write operations at once, unless we're in
1283  // blocking mode.
1284  while ((task = impl->current_write = ev_task_from_node(
1285  sllist_pop_front(&impl->write_queue)))) {
1286  int fd = impl->fd;
1287 #if !LELY_NO_THREADS
1288  pthread_mutex_unlock(&impl->mtx);
1289 #endif
1290  struct io_can_chan_write *write =
1292  int result = io_can_fd_write_msg(fd, write->msg,
1293  impl->poll ? 0 : LELY_IO_TX_TIMEOUT);
1294  int errc = !result ? 0 : errno;
1295  wouldblock = errc == EAGAIN || errc == EWOULDBLOCK;
1296  if (!wouldblock && errc)
1297  // The operation failed immediately.
1298  io_can_chan_write_post(write, errc);
1299 #if !LELY_NO_THREADS
1300  pthread_mutex_lock(&impl->mtx);
1301 #endif
1302  if (!errc)
1303  // Wait for the write confirmation.
1304  sllist_push_back(&impl->confirm_queue, &task->_node);
1305  if (task == impl->current_write) {
1306  // Put the write operation back on the queue if it would
1307  // block, unless it was canceled.
1308  if (wouldblock) {
1310  &task->_node);
1311  task = NULL;
1312  }
1313  impl->current_write = NULL;
1314  }
1315  assert(!impl->current_write);
1316  // Stop if the operation did or would block.
1317  if (!impl->poll || wouldblock)
1318  break;
1319  }
1320  // If we're waiting for a write confirmation, start reading more CAN
1321  // frames, unless we're already waiting for one.
1322  int post_rxbuf = !impl->rxbuf_posted
1323  && !sllist_empty(&impl->confirm_queue)
1324  && !(impl->events & IO_EVENT_IN) && impl->fd != -1
1325  && !impl->shutdown;
1326  if (post_rxbuf)
1327  impl->rxbuf_posted = 1;
1328  // Repost this task if any write operations remain in the queue.
1329  int post_write = !sllist_empty(&impl->write_queue) && impl->fd != -1
1330  && !impl->shutdown;
1331  // If a write operation would block, start monitoring the file
1332  // descriptor for I/O events.
1333  if (post_write && impl->poll && wouldblock) {
1334  int events = impl->events | IO_EVENT_OUT;
1335  // clang-format off
1336  if (!io_poll_watch(impl->poll, impl->fd, events,
1337  &impl->watch)) {
1338  // clang-format on
1339  impl->events = events;
1340  // Do not repost this task unless registering the file
1341  // descriptor fails.
1342  post_write = 0;
1343  }
1344  }
1345  impl->write_posted = post_write;
1346 #if !LELY_NO_THREADS
1347  pthread_mutex_unlock(&impl->mtx);
1348 #endif
1349 
1350  if (task && wouldblock)
1351  // The operation would block but was canceled before it could be
1352  // requeued.
1353  io_can_chan_write_post(
1354  io_can_chan_write_from_task(task), ECANCELED);
1355 
1356  if (post_rxbuf)
1357  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1358 
1359  if (post_write)
1360  ev_exec_post(impl->write_task.exec, &impl->write_task);
1361 
1362  errno = errsv;
1363 }
1364 
1365 static inline struct io_can_chan_impl *
1366 io_can_chan_impl_from_dev(const io_dev_t *dev)
1367 {
1368  assert(dev);
1369 
1370  return structof(dev, struct io_can_chan_impl, dev_vptr);
1371 }
1372 
1373 static inline struct io_can_chan_impl *
1374 io_can_chan_impl_from_chan(const io_can_chan_t *chan)
1375 {
1376  assert(chan);
1377 
1378  return structof(chan, struct io_can_chan_impl, chan_vptr);
1379 }
1380 
1381 static inline struct io_can_chan_impl *
1382 io_can_chan_impl_from_svc(const struct io_svc *svc)
1383 {
1384  assert(svc);
1385 
1386  return structof(svc, struct io_can_chan_impl, svc);
1387 }
1388 
1389 static void
1390 io_can_chan_impl_c_signal(struct spscring *ring, void *arg)
1391 {
1392  (void)ring;
1393  struct io_can_chan_impl *impl = arg;
1394  assert(impl);
1395 
1396 #if !LELY_NO_THREADS
1397  pthread_mutex_lock(&impl->mtx);
1398 #endif
1399  int post_read = !impl->read_posted && !sllist_empty(&impl->read_queue)
1400  && !impl->shutdown;
1401  if (post_read)
1402  impl->read_posted = 1;
1403 #if !LELY_NO_THREADS
1404  pthread_mutex_unlock(&impl->mtx);
1405 #endif
1406  if (post_read)
1407  ev_exec_post(impl->read_task.exec, &impl->read_task);
1408 }
1409 
1410 static void
1411 io_can_chan_impl_do_pop(struct io_can_chan_impl *impl,
1412  struct sllist *read_queue, struct sllist *write_queue,
1413  struct sllist *confirm_queue, struct ev_task *task)
1414 {
1415  assert(impl);
1416  assert(read_queue);
1417  assert(write_queue);
1418  assert(confirm_queue);
1419 
1420  if (!task) {
1421  sllist_append(read_queue, &impl->read_queue);
1422  sllist_append(write_queue, &impl->write_queue);
1423  sllist_append(confirm_queue, &impl->confirm_queue);
1424  } else if (sllist_remove(&impl->read_queue, &task->_node)) {
1425  sllist_push_back(read_queue, &task->_node);
1426  } else if (sllist_remove(&impl->write_queue, &task->_node)) {
1427  sllist_push_back(write_queue, &task->_node);
1428  } else if (sllist_remove(&impl->confirm_queue, &task->_node)) {
1429  sllist_push_back(confirm_queue, &task->_node);
1430  }
1431 }
1432 
1433 static void
1434 io_can_chan_impl_do_read(struct io_can_chan_impl *impl, struct sllist *queue,
1435  int *pwouldblock)
1436 {
1437  assert(impl);
1438  assert(queue);
1439 
1440  int errsv = errno;
1441 
1442  int wouldblock = 0;
1443 
1444  struct slnode *node;
1445  while ((node = sllist_first(&impl->read_queue))) {
1446  struct ev_task *task = ev_task_from_node(node);
1447  struct io_can_chan_read *read =
1449 
1450 #if !LELY_NO_THREADS
1451  pthread_mutex_lock(&impl->c_mtx);
1452 #endif
1453 
1454  // Check if a frame is available in the receive queue.
1455  size_t n = 1;
1456  size_t i = spscring_c_alloc(&impl->rxring, &n);
1457  if (!n) {
1458 #if !LELY_NO_THREADS
1459  pthread_mutex_unlock(&impl->c_mtx);
1460 #endif
1461  wouldblock = 1;
1462  break;
1463  }
1464 
1465  // Copy the frame from the buffer.
1466  struct io_can_frame *frame = &impl->rxbuf[i];
1467  void *data = &frame->frame;
1468  int is_err = can_frame2can_err(data, read->err);
1469  if (!is_err && read->msg) {
1470 #if !LELY_NO_CANFD
1471  if (frame->nbytes == CANFD_MTU)
1472  canfd_frame2can_msg(data, read->msg);
1473  else
1474 #endif
1475  can_frame2can_msg(data, read->msg);
1476  }
1477  if (read->tp)
1478  *read->tp = frame->ts;
1479  spscring_c_commit(&impl->rxring, 1);
1480 
1481 #if !LELY_NO_THREADS
1482  pthread_mutex_unlock(&impl->c_mtx);
1483 #endif
1484 
1485  read->r.result = !is_err;
1486  read->r.errc = 0;
1487 
1488  sllist_pop_front(&impl->read_queue);
1489  sllist_push_back(queue, node);
1490  }
1491 
1492  if (pwouldblock)
1493  *pwouldblock = wouldblock;
1494 
1495  errno = errsv;
1496 }
1497 
1498 static void
1499 io_can_chan_impl_do_confirm(struct io_can_chan_impl *impl, struct sllist *queue,
1500  const struct can_msg *msg)
1501 {
1502  assert(impl);
1503  assert(queue);
1504  assert(msg);
1505 
1506  // Find the matching write operation.
1507  struct slnode *node = sllist_first(&impl->confirm_queue);
1508  while (node) {
1510  ev_task_from_node(node));
1511  if (!can_msg_cmp(msg, write->msg))
1512  break;
1513  node = node->next;
1514  }
1515  if (!node)
1516  return;
1517 
1518  // Complete the matching write operation. Any preceding write operations
1519  // waiting for confirmation are considered to have failed.
1520  struct ev_task *task;
1521  while ((task = ev_task_from_node(
1522  sllist_pop_front(&impl->confirm_queue)))) {
1523  sllist_push_front(queue, &task->_node);
1524  struct io_can_chan_write *write =
1526  if (&task->_node == node) {
1527  write->errc = 0;
1528  break;
1529  } else {
1530  write->errc = EIO;
1531  }
1532  }
1533 }
1534 
1535 static size_t
1536 io_can_chan_impl_do_abort_tasks(struct io_can_chan_impl *impl)
1537 {
1538  assert(impl);
1539 
1540  size_t n = 0;
1541 
1542  // Try to abort io_can_chan_impl_rxbuf_task_func().
1543  // clang-format off
1544  if (impl->rxbuf_posted && ev_exec_abort(impl->rxbuf_task.exec,
1545  &impl->rxbuf_task)) {
1546  // clang-format on
1547  impl->rxbuf_posted = 0;
1548  n++;
1549  }
1550 
1551  // Try to abort io_can_chan_impl_read_task_func().
1552  // clang-format off
1553  if (impl->read_posted && ev_exec_abort(impl->read_task.exec,
1554  &impl->read_task)) {
1555  // clang-format on
1556  impl->read_posted = 0;
1557  n++;
1558  }
1559 
1560  // Try to abort io_can_chan_impl_write_task_func().
1561  // clang-format off
1562  if (impl->write_posted && ev_exec_abort(impl->write_task.exec,
1563  &impl->write_task)) {
1564  // clang-format on
1565  impl->write_posted = 0;
1566  n++;
1567  }
1568 
1569  return n;
1570 }
1571 
1572 static int
1573 io_can_chan_impl_set_fd(struct io_can_chan_impl *impl, int fd, int flags)
1574 {
1575  assert(impl);
1576  assert(!(flags & ~IO_CAN_BUS_FLAG_MASK));
1577 
1578  struct sllist read_queue, write_queue, confirm_queue;
1579  sllist_init(&read_queue);
1580  sllist_init(&write_queue);
1581  sllist_init(&confirm_queue);
1582 
1583 #if !LELY_NO_THREADS
1584  pthread_mutex_lock(&impl->mtx);
1585 #endif
1586 
1587  if (impl->events) {
1588  impl->events = 0;
1589  // Stop monitoring I/O events.
1590  io_poll_watch(impl->poll, impl->fd, impl->events, &impl->watch);
1591  }
1592 
1593 #if !LELY_NO_THREADS
1594  pthread_mutex_lock(&impl->c_mtx);
1595 #endif
1596  spscring_c_abort_wait(&impl->rxring);
1597  // Clear the receive queue.
1598  size_t n = SIZE_MAX;
1599  spscring_c_alloc(&impl->rxring, &n);
1600  if (n)
1601  spscring_c_commit(&impl->rxring, n);
1602 #if !LELY_NO_THREADS
1603  pthread_mutex_unlock(&impl->c_mtx);
1604 #endif
1605 
1606  int tmp = impl->fd;
1607  impl->fd = fd;
1608  fd = tmp;
1609 
1610  impl->flags = flags;
1611 
1612  // Cancel pending operations.
1613  sllist_append(&read_queue, &impl->read_queue);
1614  sllist_append(&write_queue, &impl->write_queue);
1615  sllist_append(&confirm_queue, &impl->confirm_queue);
1616 
1617  // Mark the ongoing write operation as canceled, if necessary.
1618  impl->current_write = NULL;
1619 
1620 #if !LELY_NO_THREADS
1621  pthread_mutex_unlock(&impl->mtx);
1622 #endif
1623 
1624  io_can_chan_read_queue_post(&read_queue, -1, ECANCELED);
1625  io_can_chan_write_queue_post(&write_queue, ECANCELED);
1626  io_can_chan_write_queue_post(&confirm_queue, ECANCELED);
1627 
1628  return fd;
1629 }
1630 
1631 #endif // __linux__
int can_msg2canfd_frame(const struct can_msg *src, struct canfd_frame *dst)
Converts a can_msg frame to a SocketCAN CAN FD frame.
Definition: can_msg.h:141
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:126
A CAN or CAN FD format frame.
Definition: msg.h:87
struct can_err * err
The address at which to store the CAN error frame.
Definition: can.h:86
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
size_t spscring_p_capacity(struct spscring *ring)
Returns the total capacity available for a producer in a single-producer single-consumer ring buffer...
Definition: spscring.c:65
int spscring_c_abort_wait(struct spscring *ring)
Aborts a wait operation previously registered with spscring_c_submit_wait().
Definition: spscring.c:368
io_ctx_t * ctx
A pointer to the I/O context with which the channel is registered.
Definition: can_chan.c:147
A CAN channel read operation.
Definition: can.h:74
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
size_t spscring_p_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a consumer and, if this satisfies a wait operation...
Definition: spscring.c:138
This header file is part of the I/O library; it contains the I/O context and service declarations...
This header file is part of the utilities library; it contains the single-producer, single-consumer ring buffer declarations.
int io_can_ctrl_get_flags(const io_can_ctrl_t *ctrl)
Returns the flags specifying which CAN bus features are enabled.
Definition: can_ctrl.c:190
int spscring_c_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer, single-consumer ring buffer is available for reading and, if not, registers a signal function to be invoked once the requested range becomes available.
Definition: spscring.c:327
struct ev_task read_task
The task responsible for initiating read operations.
Definition: can_chan.c:155
struct spscring rxring
The ring buffer used to control the receive queue.
Definition: can_chan.c:163
An I/O polling interface.
Definition: poll.c:48
int errc
The error number, obtained as if by get_errc(), if an error occurred or the operation was canceled...
Definition: can.h:126
unsigned int io_can_ctrl_get_index(const io_can_ctrl_t *ctrl)
Returns the interface index of a CAN controller.
Definition: can_ctrl.c:182
size_t ev_task_queue_abort(struct sllist *queue)
Aborts the tasks in queue by invoking ev_exec_on_task_fini() for each of them.
Definition: task.c:55
Data (other than priority data) MAY be read without blocking.
Definition: event.h:35
int can_msg_cmp(const void *p1, const void *p2)
Compares two CAN or CAN FD format frames.
Definition: msg.c:30
#define LELY_IO_CAN_RXLEN
The default SocketCAN receive queue length (in number of CAN frames).
Definition: can_chan.c:57
void sllist_push_front(struct sllist *list, struct slnode *node)
Pushes a node to the front of a singly-linked list.
Definition: sllist.h:205
int events
The I/O events currently being monitored by poll for fd.
Definition: can_chan.c:178
void diag(enum diag_severity severity, int errc, const char *format,...)
Emits a diagnostic message.
Definition: diag.c:156
Definition: ctx.c:35
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:138
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:121
const struct io_can_chan_vtbl * chan_vptr
A pointer to the virtual table for the CAN channel interface.
Definition: can_chan.c:137
A node in a singly-linked list.
Definition: sllist.h:39
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:221
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:233
struct ev_task rxbuf_task
The task responsible for filling the receive queue.
Definition: can_chan.c:153
struct slnode * sllist_first(const struct sllist *list)
Returns a pointer to the first node in a singly-linked list.
Definition: sllist.h:244
This is the internal header file of the Linux-specific I/O declarations.
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
struct io_poll_watch watch
The object used to monitor the file descriptor for I/O events.
Definition: can_chan.c:151
An error has occurred. This event is always reported.
Definition: event.h:48
This is the internal header file of the SocketCAN rtnetlink attributes functions. ...
unsigned write_posted
A flag indicating whether write_task has been posted to exec.
Definition: can_chan.c:186
This is the internal header file of the SocketCAN error frame conversion functions.
A CAN error frame.
Definition: err.h:28
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:213
FD Format (formerly Extended Data Length) support is enabled.
Definition: can.h:42
A CAN channel write operation.
Definition: can.h:110
Bit Rate Switch support is enabled.
Definition: can.h:44
const struct io_can_chan_vtbl *const io_can_chan_t
An abstract CAN channel.
Definition: can.h:59
struct ev_task * current_write
The write operation currently being executed.
Definition: can_chan.c:194
unsigned rxbuf_posted
A flag indicating whether rxbuf_task has been posted to exec.
Definition: can_chan.c:182
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
This header file is part of the utilities library; it contains the time function declarations.
The implementation of a CAN channel.
Definition: can_chan.c:133
struct sllist write_queue
The queue containing pending write operations.
Definition: can_chan.c:190
int io_can_chan_release(io_can_chan_t *chan)
Dissociates and returns the SocketCAN file descriptor from a CAN channel.
Definition: can_chan.c:557
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition: can.h:98
An I/O service.
Definition: ctx.h:49
This header file is part of the C11 and POSIX compatibility library; it includes <unistd.h>, if it exists, and defines any missing functionality.
struct ev_task write_task
The task responsible for initiating write operations.
Definition: can_chan.c:157
size_t spscring_p_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer, single-consumer ring buffer for writing.
Definition: spscring.c:98
io_poll_t * poll
A pointer to the polling instance used to watch for I/O events.
Definition: can_chan.c:143
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future...
Definition: exec.h:108
size_t spscring_c_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer, single-consumer ring buffer for reading.
Definition: spscring.c:262
A singly-linked list.
Definition: sllist.h:51
struct io_can_chan_read * io_can_chan_read_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel read operation from a pointer to its completion task...
Definition: can.c:91
The FD Format (FDF) flag, formerly known as Extended Data Length (EDL).
Definition: msg.h:54
#define IO_POLL_WATCH_INIT(func)
The static initializer for io_poll_watch.
Definition: poll.h:65
void spscring_init(struct spscring *ring, size_t size)
Initializes a single-producer, single-consumer ring buffer with the specified size.
Definition: spscring.c:47
struct io_svc svc
The I/O service representing the channel.
Definition: can_chan.c:145
This header file is part of the I/O library; it contains the CAN bus declarations for Linux...
int result
The result of the read operation: 1 if a CAN frame is received, 0 if an error frame is received...
Definition: can.h:68
unsigned read_posted
A flag indicating whether read_task has been posted to exec.
Definition: can_chan.c:184
io_can_chan_t * io_can_chan_create(io_poll_t *poll, ev_exec_t *exec, size_t rxlen)
Creates a new CAN channel.
Definition: can_chan.c:378
int io_can_chan_assign(io_can_chan_t *chan, int fd)
Assigns an existing SocketCAN file descriptor to a CAN channel.
Definition: can_chan.c:499
struct slnode * next
A pointer to the next node in the list.
Definition: sllist.h:41
#define LELY_IO_TX_TIMEOUT
The default timeout (in milliseconds) for I/O write operations.
Definition: io2.h:36
int errc
The error number, obtained as if by get_errc(), if result is -1.
Definition: can.h:70
const struct can_msg * msg
A pointer to the CAN frame to be written.
Definition: can.h:116
int canfd_frame2can_msg(const struct canfd_frame *src, struct can_msg *dst)
Converts a SocketCAN CAN FD frame to a can_msg frame.
Definition: can_msg.h:112
This header file is part of the utilities library; it contains the diagnostic declarations.
int io_poll_watch(io_poll_t *poll, io_handle_t handle, struct io_event *event, int keep)
Registers an I/O device with an I/O polling interface and instructs it to watch for certain events...
Definition: poll.c:249
This header file is part of the I/O library; it contains the I/O polling declarations for POSIX platf...
Reception of error frames is enabled.
Definition: can.h:39
unsigned shutdown
A flag indicating whether the I/O service has been shut down.
Definition: can_chan.c:180
size_t ev_task_queue_post(struct sllist *queue)
Post the tasks in queue to their respective executors and invokes ev_exec_on_task_fini() for each of ...
Definition: task.c:38
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:136
An object representing a file descriptor being monitored for I/O events.
Definition: poll.h:56
struct io_can_chan_read_result r
The result of the read operation.
Definition: can.h:100
struct sllist confirm_queue
The queue containing write operations waiting to be confirmed.
Definition: can_chan.c:192
int fd
The SocketCAN file descriptor.
Definition: can_chan.c:174
A single-producer, single-consumer ring buffer.
Definition: spscring.h:61
io_ctx_t * io_poll_get_ctx(const io_poll_t *poll)
Returns a pointer to the I/O context with which the I/O polling instance is registered.
Definition: poll.c:275
uint_least8_t flags
The flags (any combination of CAN_FLAG_IDE, CAN_FLAG_RTR, CAN_FLAG_FDF, CAN_FLAG_BRS and CAN_FLAG_ESI...
Definition: msg.h:94
const struct io_dev_vtbl * dev_vptr
A pointer to the virtual table for the I/O device interface.
Definition: can_chan.c:135
An executable task.
Definition: task.h:41
pthread_mutex_t mtx
The mutex protecting the file descriptor, the flags and the queues of pending operations.
Definition: can_chan.c:171
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:190
int io_can_chan_get_handle(const io_can_chan_t *chan)
Returns the SocketCAN file descriptor associated with a CAN channel, or -1 if the channel is closed...
Definition: can_chan.c:414
pthread_mutex_t c_mtx
The mutex protecting the receive queue consumer.
Definition: can_chan.c:160
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
A warning.
Definition: diag.h:47
void io_can_chan_destroy(io_can_chan_t *chan)
Destroys a CAN channel.
Definition: can_chan.c:405
ssize_t io_fd_sendmsg(int fd, const struct msghdr *msg, int flags, int timeout)
Equivalent to POSIX sendmsg(fd, msg, flags | MSG_NOSIGNAL), except that if fd is non-blocking (or the...
Definition: fd.c:116
ssize_t io_fd_recvmsg(int fd, struct msghdr *msg, int flags, int timeout)
Equivalent to POSIX recvmsg(fd, msg, flags), except that if fd is non-blocking (or the implementation...
Definition: fd.c:80
struct can_msg * msg
The address at which to store the CAN frame.
Definition: can.h:80
The virtual table of an I/O service.
Definition: ctx.h:67
int io_can_chan_is_open(const io_can_chan_t *chan)
Returns 1 if the CAN channel is open and 0 if not.
Definition: can_chan.c:565
Data (bot normal and priority data) MAY be written without blocking.
Definition: event.h:46
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node...
Definition: task.c:32
ev_exec_t * exec
A pointer to the executor used to execute all I/O tasks.
Definition: can_chan.c:149
struct io_can_frame * rxbuf
The receive queue.
Definition: can_chan.c:165
int can_msg2can_frame(const struct can_msg *src, struct can_frame *dst)
Converts a can_msg frame to a SocketCAN CAN frame.
Definition: can_msg.h:80
const struct io_can_ctrl_vtbl *const io_can_ctrl_t
An abstract CAN controller.
Definition: can.h:56
This is the internal header file of the SocketCAN CAN frame conversion functions. ...
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib.h> and defines any missing functionality.
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:184
int io_can_chan_open(io_can_chan_t *chan, const io_can_ctrl_t *ctrl, int flags)
Opens a CAN channel.
Definition: can_chan.c:429
int flags
The flags with which fd has been opened.
Definition: can_chan.c:176
#define LELY_IO_RX_TIMEOUT
The default timeout (in milliseconds) for I/O read operations.
Definition: io2.h:31
size_t spscring_c_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a producer and, if this satisfies a wait operation...
Definition: spscring.c:302
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the write operation.
Definition: can.h:121
const struct io_dev_vtbl *const io_dev_t
An abstract I/O device.
Definition: dev.h:35
int io_can_chan_close(io_can_chan_t *chan)
Closes the SocketCAN file descriptor associated with a CAN channel.
Definition: can_chan.c:571
struct io_can_chan_write * io_can_chan_write_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel write operation from a pointer to its completion task...
Definition: can.c:97
int can_frame2can_msg(const struct can_frame *src, struct can_msg *dst)
Converts a SocketCAN CAN frame to a can_msg frame.
Definition: can_msg.h:51
struct timespec * tp
The address at which to store the system time at which the CAN frame or CAN error frame was received...
Definition: can.h:93
This is the public header file of the utilities library.
struct sllist read_queue
The queue containing pending read operations.
Definition: can_chan.c:188
The Bit Rate Switch (BRS) flag (only available in CAN FD format frames).
Definition: msg.h:62