Lely core libraries  2.2.5
can_chan.c
Go to the documentation of this file.
1 
24 #include "io.h"
25 
26 #ifdef __linux__
27 
28 #include "../can.h"
29 #include <lely/io2/ctx.h>
30 #include <lely/io2/linux/can.h>
31 #include <lely/io2/posix/poll.h>
32 #include <lely/util/diag.h>
33 #include <lely/util/spscring.h>
34 #include <lely/util/time.h>
35 #include <lely/util/util.h>
36 
37 #include <assert.h>
38 #include <errno.h>
39 #include <stdlib.h>
40 
41 #if !LELY_NO_THREADS
42 #include <pthread.h>
43 #endif
44 #include <unistd.h>
45 
46 #include <linux/can/raw.h>
47 #include <linux/sockios.h>
48 #include <sys/ioctl.h>
49 
50 #include "../posix/fd.h"
51 #include "can_attr.h"
52 #include "can_err.h"
53 #include "can_msg.h"
54 
55 #ifndef LELY_IO_CAN_RXLEN
57 #define LELY_IO_CAN_RXLEN 1024
58 #endif
59 
60 struct io_can_frame {
61 #if LELY_NO_CANFD
62  struct can_frame frame;
63 #else
64  struct canfd_frame frame;
65 #endif
66  size_t nbytes;
67  struct timespec ts;
68 };
69 
70 static int io_can_fd_set_default(int fd);
71 #if LELY_NO_CANFD
72 static int io_can_fd_read(int fd, struct can_frame *frame, size_t *pnbytes,
73  int *pflags, struct timespec *tp, int timeout);
74 #else
75 static int io_can_fd_read(int fd, struct canfd_frame *frame, size_t *pnbytes,
76  int *pflags, struct timespec *tp, int timeout);
77 #endif
78 #if LELY_NO_CANFD
79 static int io_can_fd_write(int fd, const struct can_frame *frame, size_t nbytes,
80  int dontwait);
81 #else
82 static int io_can_fd_write(int fd, const struct canfd_frame *frame,
83  size_t nbytes, int timeout);
84 #endif
85 static int io_can_fd_write_msg(int fd, const struct can_msg *msg, int timeout);
86 
87 static io_ctx_t *io_can_chan_impl_dev_get_ctx(const io_dev_t *dev);
88 static ev_exec_t *io_can_chan_impl_dev_get_exec(const io_dev_t *dev);
89 static size_t io_can_chan_impl_dev_cancel(io_dev_t *dev, struct ev_task *task);
90 static size_t io_can_chan_impl_dev_abort(io_dev_t *dev, struct ev_task *task);
91 
92 // clang-format off
93 static const struct io_dev_vtbl io_can_chan_impl_dev_vtbl = {
94  &io_can_chan_impl_dev_get_ctx,
95  &io_can_chan_impl_dev_get_exec,
96  &io_can_chan_impl_dev_cancel,
97  &io_can_chan_impl_dev_abort
98 };
99 // clang-format on
100 
101 static io_dev_t *io_can_chan_impl_get_dev(const io_can_chan_t *chan);
102 static int io_can_chan_impl_get_flags(const io_can_chan_t *chan);
103 static int io_can_chan_impl_read(io_can_chan_t *chan, struct can_msg *msg,
104  struct can_err *err, struct timespec *tp, int timeout);
105 static void io_can_chan_impl_submit_read(
106  io_can_chan_t *chan, struct io_can_chan_read *read);
107 static int io_can_chan_impl_write(
108  io_can_chan_t *chan, const struct can_msg *msg, int timeout);
109 static void io_can_chan_impl_submit_write(
110  io_can_chan_t *chan, struct io_can_chan_write *write);
111 
112 // clang-format off
113 static const struct io_can_chan_vtbl io_can_chan_impl_vtbl = {
114  &io_can_chan_impl_get_dev,
115  &io_can_chan_impl_get_flags,
116  &io_can_chan_impl_read,
117  &io_can_chan_impl_submit_read,
118  &io_can_chan_impl_write,
119  &io_can_chan_impl_submit_write
120 };
121 // clang-format on
122 
123 static void io_can_chan_impl_svc_shutdown(struct io_svc *svc);
124 
125 // clang-format off
126 static const struct io_svc_vtbl io_can_chan_impl_svc_vtbl = {
127  NULL,
128  &io_can_chan_impl_svc_shutdown
129 };
130 // clang-format on
131 
135  const struct io_dev_vtbl *dev_vptr;
145  struct io_svc svc;
151  struct io_poll_watch watch;
153  struct ev_task rxbuf_task;
155  struct ev_task read_task;
157  struct ev_task write_task;
158 #if !LELY_NO_THREADS
160  pthread_mutex_t c_mtx;
161 #endif
163  struct spscring rxring;
166 #if !LELY_NO_THREADS
171  pthread_mutex_t mtx;
172 #endif
174  int fd;
176  int flags;
178  int events;
180  unsigned shutdown : 1;
182  unsigned rxbuf_posted : 1;
184  unsigned read_posted : 1;
186  unsigned write_posted : 1;
188  struct sllist read_queue;
190  struct sllist write_queue;
192  struct sllist confirm_queue;
195 };
196 
197 static void io_can_chan_impl_watch_func(
198  struct io_poll_watch *watch, int events);
199 static void io_can_chan_impl_rxbuf_task_func(struct ev_task *task);
200 static void io_can_chan_impl_read_task_func(struct ev_task *task);
201 static void io_can_chan_impl_write_task_func(struct ev_task *task);
202 
203 static inline struct io_can_chan_impl *io_can_chan_impl_from_dev(
204  const io_dev_t *dev);
205 static inline struct io_can_chan_impl *io_can_chan_impl_from_chan(
206  const io_can_chan_t *chan);
207 static inline struct io_can_chan_impl *io_can_chan_impl_from_svc(
208  const struct io_svc *svc);
209 
210 static void io_can_chan_impl_c_signal(struct spscring *ring, void *arg);
211 
212 static void io_can_chan_impl_do_pop(struct io_can_chan_impl *impl,
213  struct sllist *read_queue, struct sllist *write_queue,
214  struct sllist *confirm_queue, struct ev_task *task);
215 
216 static void io_can_chan_impl_do_read(struct io_can_chan_impl *impl,
217  struct sllist *queue, int *pwouldblock);
218 static void io_can_chan_impl_do_confirm(struct io_can_chan_impl *impl,
219  struct sllist *queue, const struct can_msg *msg);
220 
221 static size_t io_can_chan_impl_do_abort_tasks(struct io_can_chan_impl *impl);
222 
223 static int io_can_chan_impl_set_fd(
224  struct io_can_chan_impl *impl, int fd, int flags);
225 
226 void *
227 io_can_chan_alloc(void)
228 {
229  struct io_can_chan_impl *impl = malloc(sizeof(*impl));
230  // cppcheck-suppress memleak symbolName=impl
231  return impl ? &impl->chan_vptr : NULL;
232 }
233 
234 void
235 io_can_chan_free(void *ptr)
236 {
237  if (ptr)
238  free(io_can_chan_impl_from_chan(ptr));
239 }
240 
242 io_can_chan_init(io_can_chan_t *chan, io_poll_t *poll, ev_exec_t *exec,
243  size_t rxlen)
244 {
245  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
246  io_ctx_t *ctx = poll ? io_poll_get_ctx(poll) : NULL;
247 
248  if (!rxlen)
249  rxlen = LELY_IO_CAN_RXLEN;
250 
251  int errsv = 0;
252 
253  impl->dev_vptr = &io_can_chan_impl_dev_vtbl;
254  impl->chan_vptr = &io_can_chan_impl_vtbl;
255 
256  impl->poll = poll;
257 
258  impl->svc = (struct io_svc)IO_SVC_INIT(&io_can_chan_impl_svc_vtbl);
259  impl->ctx = ctx;
260 
261  impl->exec = exec;
262 
263  impl->watch = (struct io_poll_watch)IO_POLL_WATCH_INIT(
264  &io_can_chan_impl_watch_func);
265 
266  impl->rxbuf_task = (struct ev_task)EV_TASK_INIT(
267  impl->exec, &io_can_chan_impl_rxbuf_task_func);
268  impl->read_task = (struct ev_task)EV_TASK_INIT(
269  impl->exec, &io_can_chan_impl_read_task_func);
270  impl->write_task = (struct ev_task)EV_TASK_INIT(
271  impl->exec, &io_can_chan_impl_write_task_func);
272 
273 #if !LELY_NO_THREADS
274  if ((errsv = pthread_mutex_init(&impl->c_mtx, NULL)))
275  goto error_init_c_mtx;
276 #endif
277 
278  spscring_init(&impl->rxring, rxlen);
279  impl->rxbuf = calloc(rxlen, sizeof(struct io_can_frame));
280  if (!impl->rxbuf) {
281  errsv = errno;
282  goto error_alloc_rxbuf;
283  }
284 
285 #if !LELY_NO_THREADS
286  if ((errsv = pthread_mutex_init(&impl->mtx, NULL)))
287  goto error_init_mtx;
288 #endif
289 
290  impl->fd = -1;
291  impl->flags = 0;
292  impl->events = 0;
293 
294  impl->shutdown = 0;
295  impl->rxbuf_posted = 0;
296  impl->read_posted = 0;
297  impl->write_posted = 0;
298 
299  sllist_init(&impl->read_queue);
300  sllist_init(&impl->write_queue);
301  sllist_init(&impl->confirm_queue);
302  impl->current_write = NULL;
303 
304  if (impl->ctx)
305  io_ctx_insert(impl->ctx, &impl->svc);
306 
307  return chan;
308 
309 #if !LELY_NO_THREADS
310  // pthread_mutex_destroy(&impl->mtx);
311 error_init_mtx:
312 #endif
313  free(impl->rxbuf);
314 error_alloc_rxbuf:
315 #if !LELY_NO_THREADS
316  pthread_mutex_destroy(&impl->c_mtx);
317 error_init_c_mtx:
318 #endif
319  errno = errsv;
320  return NULL;
321 }
322 
323 void
324 io_can_chan_fini(io_can_chan_t *chan)
325 {
326  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
327 
328  if (impl->ctx)
329  io_ctx_remove(impl->ctx, &impl->svc);
330  // Cancel all pending operations.
331  io_can_chan_impl_svc_shutdown(&impl->svc);
332 
333 #if !LELY_NO_THREADS
334  // Abort any consumer wait operation running in a task.
335  pthread_mutex_lock(&impl->c_mtx);
337  pthread_mutex_unlock(&impl->c_mtx);
338 
339  int warning = 0;
340  pthread_mutex_lock(&impl->mtx);
341  // If necessary, busy-wait until io_can_chan_impl_rxbuf_task_func(),
342  // io_can_chan_impl_read_task_func() and
343  // io_can_chan_impl_write_task_func() complete.
344  while (impl->rxbuf_posted || impl->read_posted || impl->write_posted) {
345  if (io_can_chan_impl_do_abort_tasks(impl))
346  continue;
347  pthread_mutex_unlock(&impl->mtx);
348  if (!warning) {
349  warning = 1;
350  diag(DIAG_WARNING, 0,
351  "io_can_chan_fini() invoked with pending operations");
352  }
353  sched_yield();
354  pthread_mutex_lock(&impl->mtx);
355  }
356  pthread_mutex_unlock(&impl->mtx);
357 #endif
358 
359  // Close the socket.
360  if (impl->fd != -1) {
361  if (impl->events)
362  io_poll_watch(impl->poll, impl->fd, 0, &impl->watch);
363  close(impl->fd);
364  }
365 
366 #if !LELY_NO_THREADS
367  pthread_mutex_destroy(&impl->mtx);
368 #endif
369 
370  free(impl->rxbuf);
371 
372 #if !LELY_NO_THREADS
373  pthread_mutex_destroy(&impl->c_mtx);
374 #endif
375 }
376 
379 {
380  int errsv = 0;
381 
382  io_can_chan_t *chan = io_can_chan_alloc();
383  if (!chan) {
384  errsv = errno;
385  goto error_alloc;
386  }
387 
388  io_can_chan_t *tmp = io_can_chan_init(chan, poll, exec, rxlen);
389  if (!tmp) {
390  errsv = errno;
391  goto error_init;
392  }
393  chan = tmp;
394 
395  return chan;
396 
397 error_init:
398  io_can_chan_free((void *)chan);
399 error_alloc:
400  errno = errsv;
401  return NULL;
402 }
403 
404 void
406 {
407  if (chan) {
408  io_can_chan_fini(chan);
409  io_can_chan_free((void *)chan);
410  }
411 }
412 
413 int
415 {
416  const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
417 
418 #if !LELY_NO_THREADS
419  pthread_mutex_lock((pthread_mutex_t *)&impl->mtx);
420 #endif
421  int fd = impl->fd;
422 #if !LELY_NO_THREADS
423  pthread_mutex_unlock((pthread_mutex_t *)&impl->mtx);
424 #endif
425  return fd;
426 }
427 
428 int
430 {
431  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
432 
434  errno = EINVAL;
435  return -1;
436  }
437 
438  int errsv = 0;
439 
440  int fd = socket(AF_CAN, SOCK_RAW | SOCK_CLOEXEC, CAN_RAW);
441  if (fd == -1) {
442  errsv = errno;
443  goto error_socket;
444  }
445 
446  struct sockaddr_can addr = { .can_family = AF_CAN,
447  .can_ifindex = io_can_ctrl_get_index(ctrl) };
448 
449  if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
450  errsv = errno;
451  goto error_bind;
452  }
453 
454  if (flags & IO_CAN_BUS_FLAG_ERR) {
455  can_err_mask_t optval = CAN_ERR_MASK;
456  // clang-format off
457  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_ERR_FILTER, &optval,
458  sizeof(optval)) == -1) {
459  // clang-format on
460  errsv = errno;
461  goto error_setsockopt;
462  }
463  }
464 
465 #if !LELY_NO_CANFD
466  if (flags & IO_CAN_BUS_FLAG_FDF) {
467  int optval = 1;
468  // clang-format off
469  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_FD_FRAMES, &optval,
470  sizeof(optval)) == -1) {
471  // clang-format on
472  errsv = errno;
473  goto error_setsockopt;
474  }
475  }
476 #endif
477 
478  if (io_can_fd_set_default(fd) == -1) {
479  errsv = errno;
480  goto error_set_default;
481  }
482 
483  fd = io_can_chan_impl_set_fd(impl, fd, flags);
484  if (fd != -1)
485  close(fd);
486 
487  return 0;
488 
489 error_set_default:
490 error_setsockopt:
491 error_bind:
492  close(fd);
493 error_socket:
494  errno = errsv;
495  return -1;
496 }
497 
498 int
500 {
501  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
502 
503  struct sockaddr_can addr = { .can_family = AF_UNSPEC };
504  socklen_t addrlen = sizeof(addr);
505  if (getsockname(fd, (struct sockaddr *)&addr, &addrlen) == -1)
506  return -1;
507  if (addrlen < sizeof(addr) || addr.can_family != AF_CAN) {
508  errno = ENODEV;
509  return -1;
510  }
511  unsigned int ifindex = addr.can_ifindex;
512 
513  struct io_can_attr attr = IO_CAN_ATTR_INIT;
514  if (io_can_attr_get(&attr, ifindex) == -1)
515  return -1;
516  int flags = attr.flags;
517 
518  {
519  can_err_mask_t optval = 0;
520  socklen_t optlen = sizeof(optval);
521  // clang-format off
522  if (getsockopt(fd, SOL_CAN_RAW, CAN_RAW_ERR_FILTER, &optval,
523  &optlen) == -1)
524  // clang-format on
525  return -1;
526  if (optval & CAN_ERR_MASK)
527  flags |= IO_CAN_BUS_FLAG_ERR;
528  }
529 
530 #if !LELY_NO_CANFD
531  // Check if CAN FD frames are allowed. If the check fails, we assume
532  // they are not.
533  {
534  int errsv = errno;
535  int optval = 0;
536  socklen_t optlen = sizeof(optval);
537  // clang-format off
538  if (!getsockopt(fd, SOL_CAN_RAW, CAN_RAW_FD_FRAMES, &optval,
539  &optlen) && !optval)
540  // clang-format on
542  errno = errsv;
543  }
544 #endif
545 
546  if (io_can_fd_set_default(fd) == -1)
547  return -1;
548 
549  fd = io_can_chan_impl_set_fd(impl, fd, flags);
550  if (fd != -1)
551  close(fd);
552 
553  return 0;
554 }
555 
556 int
558 {
559  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
560 
561  return io_can_chan_impl_set_fd(impl, -1, 0);
562 }
563 
564 int
566 {
567  return io_can_chan_get_handle(chan) != -1;
568 }
569 
570 int
572 {
573  int fd = io_can_chan_release(chan);
574  return fd != -1 ? close(fd) : 0;
575 }
576 
577 static int
578 io_can_fd_set_default(int fd)
579 {
580  int optval;
581 
582  // Enable local loopback.
583  optval = 1;
584  // clang-format off
585  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_LOOPBACK, &optval,
586  sizeof(optval)) == -1)
587  // clang-format on
588  return -1;
589 
590  // Enable the reception of CAN frames sent by this socket so we can
591  // check for successful transmission.
592  optval = 1;
593  // clang-format off
594  if (setsockopt(fd, SOL_CAN_RAW, CAN_RAW_RECV_OWN_MSGS, &optval,
595  sizeof(optval)) == -1)
596  // clang-format on
597  return -1;
598 
599  // Set the size of the send buffer to its minimum value. This causes
600  // write operations to block (or return EAGAIN) instead of returning
601  // ENOBUFS.
602  optval = 0;
603  // clang-format off
604  if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &optval, sizeof(optval))
605  == -1)
606  // clang-format on
607  return -1;
608 
609  return 0;
610 }
611 
612 static int
613 #if LELY_NO_CANFD
614 io_can_fd_read(int fd, struct can_frame *frame, size_t *pnbytes, int *pflags,
615  struct timespec *tp, int timeout)
616 #else
617 io_can_fd_read(int fd, struct canfd_frame *frame, size_t *pnbytes, int *pflags,
618  struct timespec *tp, int timeout)
619 #endif
620 {
621  struct iovec iov = { .iov_base = (void *)frame,
622  .iov_len = sizeof(*frame) };
623  struct msghdr msg = { .msg_iov = &iov, .msg_iovlen = 1 };
624 
625  ssize_t result;
626  for (;;) {
627  result = io_fd_recvmsg(fd, &msg, 0, timeout);
628  if (result < 0)
629  return result;
630 #if LELY_NO_CANFD
631  if (result == CAN_MTU)
632 #else
633  if (result == CAN_MTU || result == CANFD_MTU)
634 #endif
635  break;
636  if (timeout > 0)
637  timeout = 0;
638  }
639 
640  if (pnbytes)
641  *pnbytes = result;
642 
643  if (pflags)
644  *pflags = msg.msg_flags;
645 
646  if (tp) {
647  if (msg.msg_flags & MSG_CONFIRM) {
648  // Ignore the timestamp for write confirmations.
649  *tp = (struct timespec){ 0, 0 };
650  } else {
651  struct timeval tv = { 0, 0 };
652  if (ioctl(fd, SIOCGSTAMP, &tv) == -1)
653  return -1;
654  tp->tv_sec = tv.tv_sec;
655  tp->tv_nsec = tv.tv_usec * 1000;
656  }
657  }
658 
659  return 0;
660 }
661 
662 static int
663 #if LELY_NO_CANFD
664 io_can_fd_write(int fd, const struct can_frame *frame, size_t nbytes,
665  int timeout)
666 #else
667 io_can_fd_write(int fd, const struct canfd_frame *frame, size_t nbytes,
668  int timeout)
669 #endif
670 {
671  struct iovec iov = { .iov_base = (void *)frame, .iov_len = nbytes };
672  struct msghdr msg = { .msg_iov = &iov, .msg_iovlen = 1 };
673 
674  return io_fd_sendmsg(fd, &msg, 0, timeout) > 0 ? 0 : -1;
675 }
676 
677 static int
678 io_can_fd_write_msg(int fd, const struct can_msg *msg, int timeout)
679 {
680  assert(msg);
681 
682  // Convert the frame to the SocketCAN format.
683  struct io_can_frame frame;
684 #if !LELY_NO_CANFD
685  if (msg->flags & CAN_FLAG_FDF) {
686  if (can_msg2canfd_frame(msg, &frame.frame) == -1) {
687  errno = EINVAL;
688  return -1;
689  }
690  frame.nbytes = CANFD_MTU;
691  } else {
692 #endif
693  if (can_msg2can_frame(msg, (struct can_frame *)&frame.frame)
694  == -1) {
695  errno = EINVAL;
696  return -1;
697  }
698  frame.nbytes = CAN_MTU;
699 #if !LELY_NO_CANFD
700  }
701 #endif
702 
703  return io_can_fd_write(fd, &frame.frame, frame.nbytes, timeout);
704 }
705 
706 static io_ctx_t *
707 io_can_chan_impl_dev_get_ctx(const io_dev_t *dev)
708 {
709  const struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
710 
711  return impl->ctx;
712 }
713 
714 static ev_exec_t *
715 io_can_chan_impl_dev_get_exec(const io_dev_t *dev)
716 {
717  const struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
718 
719  return impl->exec;
720 }
721 
722 static size_t
723 io_can_chan_impl_dev_cancel(io_dev_t *dev, struct ev_task *task)
724 {
725  struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
726 
727  size_t n = 0;
728 
729  struct sllist read_queue, write_queue, confirm_queue;
730  sllist_init(&read_queue);
731  sllist_init(&write_queue);
732  sllist_init(&confirm_queue);
733 
734 #if !LELY_NO_THREADS
735  pthread_mutex_lock(&impl->mtx);
736 #endif
737  io_can_chan_impl_do_pop(
738  impl, &read_queue, &write_queue, &confirm_queue, task);
739  // Mark the ongoing write operation as canceled, if necessary.
740  if (impl->current_write && (!task || task == impl->current_write)) {
741  impl->current_write = NULL;
742  n++;
743  }
744 #if !LELY_NO_THREADS
745  pthread_mutex_unlock(&impl->mtx);
746 #endif
747 
748  size_t nread = io_can_chan_read_queue_post(&read_queue, -1, ECANCELED);
749  n = n < SIZE_MAX - nread ? n + nread : SIZE_MAX;
750  size_t nwrite = io_can_chan_write_queue_post(&write_queue, ECANCELED);
751  n = n < SIZE_MAX - nwrite ? n + nwrite : SIZE_MAX;
752  size_t nconfirm =
753  io_can_chan_write_queue_post(&confirm_queue, ECANCELED);
754  n = n < SIZE_MAX - nconfirm ? n + nconfirm : SIZE_MAX;
755 
756  return n;
757 }
758 
759 static size_t
760 io_can_chan_impl_dev_abort(io_dev_t *dev, struct ev_task *task)
761 {
762  struct io_can_chan_impl *impl = io_can_chan_impl_from_dev(dev);
763 
764  struct sllist queue;
765  sllist_init(&queue);
766 
767 #if !LELY_NO_THREADS
768  pthread_mutex_lock(&impl->mtx);
769 #endif
770  io_can_chan_impl_do_pop(impl, &queue, &queue, &queue, task);
771 #if !LELY_NO_THREADS
772  pthread_mutex_unlock(&impl->mtx);
773 #endif
774 
775  return ev_task_queue_abort(&queue);
776 }
777 static io_dev_t *
778 io_can_chan_impl_get_dev(const io_can_chan_t *chan)
779 {
780  const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
781 
782  return &impl->dev_vptr;
783 }
784 
785 static int
786 io_can_chan_impl_get_flags(const io_can_chan_t *chan)
787 {
788  const struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
789 
790 #if !LELY_NO_THREADS
791  pthread_mutex_lock((pthread_mutex_t *)&impl->mtx);
792 #endif
793  int flags = impl->flags;
794 #if !LELY_NO_THREADS
795  pthread_mutex_unlock((pthread_mutex_t *)&impl->mtx);
796 #endif
797  return flags;
798 }
799 
800 static int
801 io_can_chan_impl_read(io_can_chan_t *chan, struct can_msg *msg,
802  struct can_err *err, struct timespec *tp, int timeout)
803 {
804  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
805 
806  struct io_can_frame frame_;
807  struct io_can_frame *frame = &frame_;
808 #if !LELY_NO_THREADS
809  pthread_mutex_lock(&impl->c_mtx);
810 #endif
811  size_t n = 0;
812  for (;;) {
813  // Check if a frame is available in the receive queue.
814  n = 1;
815  size_t i = spscring_c_alloc(&impl->rxring, &n);
816  if (n) {
817  frame = &impl->rxbuf[i];
818  break;
819  }
820 #if !LELY_NO_THREADS
821  pthread_mutex_unlock(&impl->c_mtx);
822  pthread_mutex_lock(&impl->mtx);
823 #endif
824  // If not, read a frame directly.
825  int fd = impl->fd;
826 #if !LELY_NO_THREADS
827  pthread_mutex_unlock(&impl->mtx);
828 #endif
829  int flags = 0;
830  // clang-format off
831  if (io_can_fd_read(fd, &frame->frame, &frame->nbytes, &flags,
832  &frame->ts, timeout) < 0)
833  // clang-format on
834  return -1;
835  // Process the frame unless it is a write confirmation.
836  if (!(flags & MSG_CONFIRM))
837  break;
838  // Convert the frame from the SocketCAN format.
839  void *src = &frame->frame;
840  struct can_msg msg;
841 #if !LELY_NO_CANFD
842  if (frame->nbytes == CANFD_MTU)
843  canfd_frame2can_msg(src, &msg);
844  else
845 #endif
846  can_frame2can_msg(src, &msg);
847  // Process the write confirmation.
848  struct sllist queue;
849  sllist_init(&queue);
850 #if !LELY_NO_THREADS
851  pthread_mutex_lock(&impl->mtx);
852 #endif
853  io_can_chan_impl_do_confirm(impl, &queue, &msg);
854 #if !LELY_NO_THREADS
855  pthread_mutex_unlock(&impl->mtx);
856 #endif
857  ev_task_queue_post(&queue);
858  // Since the timeout is relative, we can only use a positive
859  // value once.
860  if (timeout > 0)
861  timeout = 0;
862 #if !LELY_NO_THREADS
863  pthread_mutex_lock(&impl->c_mtx);
864 #endif
865  }
866  // Parse the frame.
867  void *data = &frame->frame;
868  int is_err = can_frame2can_err(data, err);
869  if (!is_err && msg) {
870 #if !LELY_NO_CANFD
871  if (frame->nbytes == CANFD_MTU)
872  canfd_frame2can_msg(data, msg);
873  else
874 #endif
875  can_frame2can_msg(data, msg);
876  }
877  if (tp)
878  *tp = frame->ts;
879  // Remove the frame from the receive queue, if necessary.
880  if (n) {
881  spscring_c_commit(&impl->rxring, n);
882 #if !LELY_NO_THREADS
883  pthread_mutex_unlock(&impl->c_mtx);
884 #endif
885  }
886 
887  return is_err == -1 ? -1 : !is_err;
888 }
889 
890 static void
891 io_can_chan_impl_submit_read(io_can_chan_t *chan, struct io_can_chan_read *read)
892 {
893  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
894  assert(read);
895  struct ev_task *task = &read->task;
896 
897  if (!task->exec)
898  task->exec = impl->exec;
899  assert(task->exec);
900  ev_exec_on_task_init(task->exec);
901 
902 #if !LELY_NO_THREADS
903  pthread_mutex_lock(&impl->mtx);
904 #endif
905  if (impl->shutdown) {
906 #if !LELY_NO_THREADS
907  pthread_mutex_unlock(&impl->mtx);
908 #endif
909  io_can_chan_read_post(read, -1, ECANCELED);
910  } else {
911  int post_read = !impl->read_posted
912  && sllist_empty(&impl->read_queue);
913  sllist_push_back(&impl->read_queue, &task->_node);
914  if (post_read)
915  impl->read_posted = 1;
916 #if !LELY_NO_THREADS
917  pthread_mutex_unlock(&impl->mtx);
918 #endif
919  assert(impl->read_task.exec);
920  if (post_read)
921  ev_exec_post(impl->read_task.exec, &impl->read_task);
922  }
923 }
924 
925 static int
926 io_can_chan_impl_write(
927  io_can_chan_t *chan, const struct can_msg *msg, int timeout)
928 {
929  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
930 
931 #if !LELY_NO_CANFD
932  int flags = 0;
933  if (msg->flags & CAN_FLAG_FDF)
935  if (msg->flags & CAN_FLAG_BRS)
937 #endif
938 
939 #if !LELY_NO_THREADS
940  pthread_mutex_lock(&impl->mtx);
941 #endif
942 #if !LELY_NO_CANFD
943  if ((flags & impl->flags) != flags) {
944 #if !LELY_NO_THREADS
945  pthread_mutex_unlock(&impl->mtx);
946 #endif
947  errno = EINVAL;
948  return -1;
949  }
950 #endif
951  int fd = impl->fd;
952 #if !LELY_NO_THREADS
953  pthread_mutex_unlock(&impl->mtx);
954 #endif
955 
956  return io_can_fd_write_msg(fd, msg, timeout);
957 }
958 
959 static void
960 io_can_chan_impl_submit_write(
961  io_can_chan_t *chan, struct io_can_chan_write *write)
962 {
963  struct io_can_chan_impl *impl = io_can_chan_impl_from_chan(chan);
964  assert(write);
965  assert(write->msg);
966  struct ev_task *task = &write->task;
967 
968 #if !LELY_NO_CANFD
969  int flags = 0;
970  if (write->msg->flags & CAN_FLAG_FDF)
971  flags |= IO_CAN_BUS_FLAG_FDF;
972  if (write->msg->flags & CAN_FLAG_BRS)
973  flags |= IO_CAN_BUS_FLAG_BRS;
974 #endif
975 
976  if (!task->exec)
977  task->exec = impl->exec;
978  assert(task->exec);
979  ev_exec_on_task_init(task->exec);
980 
981 #if !LELY_NO_THREADS
982  pthread_mutex_lock(&impl->mtx);
983 #endif
984  if (impl->shutdown) {
985 #if !LELY_NO_THREADS
986  pthread_mutex_unlock(&impl->mtx);
987 #endif
988  io_can_chan_write_post(write, ECANCELED);
989 #if !LELY_NO_CANFD
990  } else if ((flags & impl->flags) != flags) {
991 #if !LELY_NO_THREADS
992  pthread_mutex_unlock(&impl->mtx);
993 #endif
994  io_can_chan_write_post(write, EINVAL);
995 #endif
996  } else {
997  int post_write = !impl->write_posted
998  && sllist_empty(&impl->write_queue);
999  sllist_push_back(&impl->write_queue, &task->_node);
1000  if (post_write)
1001  impl->write_posted = 1;
1002 #if !LELY_NO_THREADS
1003  pthread_mutex_unlock(&impl->mtx);
1004 #endif
1005  assert(impl->write_task.exec);
1006  if (post_write)
1007  ev_exec_post(impl->write_task.exec, &impl->write_task);
1008  }
1009 }
1010 
1011 static void
1012 io_can_chan_impl_svc_shutdown(struct io_svc *svc)
1013 {
1014  struct io_can_chan_impl *impl = io_can_chan_impl_from_svc(svc);
1015  io_dev_t *dev = &impl->dev_vptr;
1016 
1017 #if !LELY_NO_THREADS
1018  pthread_mutex_lock(&impl->mtx);
1019 #endif
1020  int shutdown = !impl->shutdown;
1021  impl->shutdown = 1;
1022  if (shutdown) {
1023  if (impl->events) {
1024  impl->events = 0;
1025  // Stop monitoring I/O events.
1026  io_poll_watch(impl->poll, impl->fd, impl->events,
1027  &impl->watch);
1028  }
1029  // Try to abort io_can_chan_impl_rxbuf_task_func(),
1030  // io_can_chan_impl_read_task_func() and
1031  // io_can_chan_impl_write_task_func().
1032  io_can_chan_impl_do_abort_tasks(impl);
1033  }
1034 #if !LELY_NO_THREADS
1035  pthread_mutex_unlock(&impl->mtx);
1036 #endif
1037 
1038  if (shutdown)
1039  // Cancel all pending operations.
1040  io_can_chan_impl_dev_cancel(dev, NULL);
1041 }
1042 
1043 static void
1044 io_can_chan_impl_watch_func(struct io_poll_watch *watch, int events)
1045 {
1046  assert(watch);
1047  struct io_can_chan_impl *impl =
1049 
1050 #if !LELY_NO_THREADS
1051  pthread_mutex_lock(&impl->mtx);
1052 #endif
1053  // Continue monitoring events that are not otherwise handled.
1054  if ((events & IO_EVENT_ERR) || impl->fd == -1 || impl->shutdown) {
1055  impl->events = 0;
1056  } else if ((impl->events &= ~events) != 0) {
1057  int errsv = errno;
1058  // clang-format off
1059  if (io_poll_watch(impl->poll, impl->fd, impl->events,
1060  &impl->watch) == -1) {
1061  // clang-format on
1062  impl->events = 0;
1063  events |= IO_EVENT_ERR;
1064  }
1065  errno = errsv;
1066  }
1067 
1068  // Process incoming CAN frames.
1069  int post_rxbuf = 0;
1070  if ((events & (IO_EVENT_IN | IO_EVENT_ERR)) && !impl->shutdown) {
1071  post_rxbuf = !impl->rxbuf_posted;
1072  impl->rxbuf_posted = 1;
1073  }
1074 
1075  // Retry any pending write operations.
1076  int post_write = 0;
1077  if ((events & (IO_EVENT_OUT | IO_EVENT_ERR))
1078  && !sllist_empty(&impl->write_queue)
1079  && !impl->shutdown) {
1080  post_write = !impl->write_posted;
1081  impl->write_posted = 1;
1082  }
1083 #if !LELY_NO_THREADS
1084  pthread_mutex_unlock(&impl->mtx);
1085 #endif
1086 
1087  if (post_rxbuf)
1088  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1089  if (post_write)
1090  ev_exec_post(impl->write_task.exec, &impl->write_task);
1091 }
1092 
1093 static void
1094 io_can_chan_impl_rxbuf_task_func(struct ev_task *task)
1095 {
1096  assert(task);
1097  struct io_can_chan_impl *impl =
1098  structof(task, struct io_can_chan_impl, rxbuf_task);
1099 
1100  int errsv = errno;
1101 
1102  struct sllist queue;
1103  sllist_init(&queue);
1104 
1105  int result = 0;
1106  int errc = 0;
1107  int wouldblock = 0;
1108 
1109 #if !LELY_NO_THREADS
1110  pthread_mutex_lock(&impl->mtx);
1111 #endif
1112  // Only try to fill the receive queue if there are pending read
1113  // operations and there is an empty slot in the queue, or if there are
1114  // pending write confirmations.
1115  // clang-format off
1116  while ((!sllist_empty(&impl->read_queue)
1117  && spscring_p_capacity(&impl->rxring))
1118  || !sllist_empty(&impl->confirm_queue)) {
1119  // clang-format on
1120  int fd = impl->fd;
1121 #if !LELY_NO_THREADS
1122  pthread_mutex_unlock(&impl->mtx);
1123 #endif
1124 
1125  struct io_can_frame frame_;
1126  struct io_can_frame *frame = &frame_;
1127  // Try to obtain an empty slot in the receive queue.
1128  size_t n = 1;
1129  size_t i = spscring_p_alloc(&impl->rxring, &n);
1130  if (n)
1131  frame = &impl->rxbuf[i];
1132 
1133  // Try to read a CAN or CAN FD format frame from the CAN bus.
1134  int flags = 0;
1135  result = io_can_fd_read(fd, &frame->frame, &frame->nbytes,
1136  &flags, &frame->ts,
1137  impl->poll ? 0 : LELY_IO_RX_TIMEOUT);
1138  errc = !result ? 0 : errno;
1139  wouldblock = errc == EAGAIN || errc == EWOULDBLOCK;
1140 
1141  // Convert the frame from the SocketCAN format if it is a write
1142  // confirmation.
1143  struct can_msg msg;
1144  if (!result && (flags & MSG_CONFIRM)) {
1145  void *src = &frame->frame;
1146 #if !LELY_NO_CANFD
1147  if (frame->nbytes == CANFD_MTU)
1148  canfd_frame2can_msg(src, &msg);
1149  else
1150 #endif
1151  can_frame2can_msg(src, &msg);
1152  }
1153 
1154  // Make the frame available for reading.
1155  if (!result && !(flags & MSG_CONFIRM))
1156  spscring_p_commit(&impl->rxring, n);
1157 
1158 #if !LELY_NO_THREADS
1159  pthread_mutex_lock(&impl->mtx);
1160 #endif
1161  // Process the write confirmation, if any.
1162  if (!result && flags & MSG_CONFIRM)
1163  io_can_chan_impl_do_confirm(impl, &queue, &msg);
1164 
1165  // Stop if the operation did or would block, or if an error
1166  // occurred.
1167  if (!impl->poll || result < 0)
1168  break;
1169  }
1170  // Cancel all pending read operations on error.
1171  if (result < 0 && !wouldblock) {
1172  io_can_chan_impl_do_read(impl, &queue, NULL);
1173  while ((task = ev_task_from_node(
1174  sllist_pop_front(&impl->read_queue)))) {
1175  struct io_can_chan_read *read =
1177  read->r.result = result;
1178  read->r.errc = errc;
1179  sllist_push_back(&queue, &task->_node);
1180  }
1181  }
1182  // clang-format off
1183  int post_rxbuf = !(sllist_empty(&impl->read_queue)
1184  && sllist_empty(&impl->confirm_queue))
1185  && impl->fd != -1 && !impl->shutdown;
1186  // clang-format on
1187  // If a read operation would block, start monitoring the file descriptor
1188  // for I/O events.
1189  if (post_rxbuf && impl->poll && wouldblock) {
1190  int events = impl->events | IO_EVENT_IN;
1191  // clang-format off
1192  if (!io_poll_watch(impl->poll, impl->fd, events,
1193  &impl->watch)) {
1194  // clang-format on
1195  impl->events = events;
1196  // Do not repost this thask unless registering the file
1197  // descriptor fails.
1198  post_rxbuf = 0;
1199  }
1200  }
1201  impl->rxbuf_posted = post_rxbuf;
1202 #if !LELY_NO_THREADS
1203  pthread_mutex_unlock(&impl->mtx);
1204 #endif
1205 
1206  ev_task_queue_post(&queue);
1207 
1208  if (post_rxbuf)
1209  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1210 
1211  errno = errsv;
1212 }
1213 
1214 static void
1215 io_can_chan_impl_read_task_func(struct ev_task *task)
1216 {
1217  assert(task);
1218  struct io_can_chan_impl *impl =
1219  structof(task, struct io_can_chan_impl, read_task);
1220 
1221  struct sllist queue;
1222  sllist_init(&queue);
1223 
1224 #if !LELY_NO_THREADS
1225  pthread_mutex_lock(&impl->mtx);
1226 #endif
1227  // Process any pending read operations that can be satisfied.
1228  int wouldblock = 0;
1229  io_can_chan_impl_do_read(impl, &queue, &wouldblock);
1230  int post_rxbuf = 0;
1231  // Repost this task if any read operations remain in the queue.
1232  int post_read = !sllist_empty(&impl->read_queue) && !impl->shutdown;
1233  // Register a wait operation if the receive queue is empty.
1234  if (post_read && wouldblock) {
1235 #if !LELY_NO_THREADS
1236  pthread_mutex_lock(&impl->c_mtx);
1237 #endif
1238  // Do not repost this task unless the wait condition can be
1239  // satisfied immediately.
1240  post_read = !spscring_c_submit_wait(&impl->rxring, 1,
1241  io_can_chan_impl_c_signal, impl);
1242 #if !LELY_NO_THREADS
1243  pthread_mutex_unlock(&impl->c_mtx);
1244 #endif
1245  if (!post_read)
1246  // If the receive queue is empty, start reading more CAN
1247  // frames, unless we're already waiting for one.
1248  post_rxbuf = !impl->rxbuf_posted
1249  && !(impl->events & IO_EVENT_IN)
1250  && impl->fd != -1 && !impl->shutdown;
1251  }
1252  if (post_rxbuf)
1253  impl->rxbuf_posted = 1;
1254  impl->read_posted = post_read;
1255 #if !LELY_NO_THREADS
1256  pthread_mutex_unlock(&impl->mtx);
1257 #endif
1258 
1259  ev_task_queue_post(&queue);
1260 
1261  if (post_rxbuf)
1262  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1263 
1264  if (post_read)
1265  ev_exec_post(impl->read_task.exec, &impl->read_task);
1266 }
1267 
1268 static void
1269 io_can_chan_impl_write_task_func(struct ev_task *task)
1270 {
1271  assert(task);
1272  struct io_can_chan_impl *impl =
1273  structof(task, struct io_can_chan_impl, write_task);
1274 
1275  int errsv = errno;
1276 
1277  int wouldblock = 0;
1278 
1279 #if !LELY_NO_THREADS
1280  pthread_mutex_lock(&impl->mtx);
1281 #endif
1282  // Try to process all pending write operations at once, unless we're in
1283  // blocking mode.
1284  while ((task = impl->current_write = ev_task_from_node(
1285  sllist_pop_front(&impl->write_queue)))) {
1286  int fd = impl->fd;
1287 #if !LELY_NO_THREADS
1288  pthread_mutex_unlock(&impl->mtx);
1289 #endif
1290  struct io_can_chan_write *write =
1292  int result = io_can_fd_write_msg(fd, write->msg,
1293  impl->poll ? 0 : LELY_IO_TX_TIMEOUT);
1294  int errc = !result ? 0 : errno;
1295  wouldblock = errc == EAGAIN || errc == EWOULDBLOCK;
1296  if (!wouldblock && errc)
1297  // The operation failed immediately.
1298  io_can_chan_write_post(write, errc);
1299 #if !LELY_NO_THREADS
1300  pthread_mutex_lock(&impl->mtx);
1301 #endif
1302  if (!errc)
1303  // Wait for the write confirmation.
1304  sllist_push_back(&impl->confirm_queue, &task->_node);
1305  if (task == impl->current_write) {
1306  // Put the write operation back on the queue if it would
1307  // block, unless it was canceled.
1308  if (wouldblock) {
1310  &task->_node);
1311  task = NULL;
1312  }
1313  impl->current_write = NULL;
1314  }
1315  assert(!impl->current_write);
1316  // Stop if the operation did or would block.
1317  if (!impl->poll || wouldblock)
1318  break;
1319  }
1320  // If we're waiting for a write confirmation, start reading more CAN
1321  // frames, unless we're already waiting for one.
1322  int post_rxbuf = !impl->rxbuf_posted
1323  && !sllist_empty(&impl->confirm_queue)
1324  && !(impl->events & IO_EVENT_IN) && impl->fd != -1
1325  && !impl->shutdown;
1326  if (post_rxbuf)
1327  impl->rxbuf_posted = 1;
1328  // Repost this task if any write operations remain in the queue.
1329  int post_write = !sllist_empty(&impl->write_queue) && impl->fd != -1
1330  && !impl->shutdown;
1331  // If a write operation would block, start monitoring the file
1332  // descriptor for I/O events.
1333  if (post_write && impl->poll && wouldblock) {
1334  int events = impl->events | IO_EVENT_OUT;
1335  // clang-format off
1336  if (!io_poll_watch(impl->poll, impl->fd, events,
1337  &impl->watch)) {
1338  // clang-format on
1339  impl->events = events;
1340  // Do not repost this task unless registering the file
1341  // descriptor fails.
1342  post_write = 0;
1343  }
1344  }
1345  impl->write_posted = post_write;
1346 #if !LELY_NO_THREADS
1347  pthread_mutex_unlock(&impl->mtx);
1348 #endif
1349 
1350  if (task && wouldblock)
1351  // The operation would block but was canceled before it could be
1352  // requeued.
1353  io_can_chan_write_post(
1354  io_can_chan_write_from_task(task), ECANCELED);
1355 
1356  if (post_rxbuf)
1357  ev_exec_post(impl->rxbuf_task.exec, &impl->rxbuf_task);
1358 
1359  if (post_write)
1360  ev_exec_post(impl->write_task.exec, &impl->write_task);
1361 
1362  errno = errsv;
1363 }
1364 
1365 static inline struct io_can_chan_impl *
1366 io_can_chan_impl_from_dev(const io_dev_t *dev)
1367 {
1368  assert(dev);
1369 
1370  return structof(dev, struct io_can_chan_impl, dev_vptr);
1371 }
1372 
1373 static inline struct io_can_chan_impl *
1374 io_can_chan_impl_from_chan(const io_can_chan_t *chan)
1375 {
1376  assert(chan);
1377 
1378  return structof(chan, struct io_can_chan_impl, chan_vptr);
1379 }
1380 
1381 static inline struct io_can_chan_impl *
1382 io_can_chan_impl_from_svc(const struct io_svc *svc)
1383 {
1384  assert(svc);
1385 
1386  return structof(svc, struct io_can_chan_impl, svc);
1387 }
1388 
1389 static void
1390 io_can_chan_impl_c_signal(struct spscring *ring, void *arg)
1391 {
1392  (void)ring;
1393  struct io_can_chan_impl *impl = arg;
1394  assert(impl);
1395 
1396 #if !LELY_NO_THREADS
1397  pthread_mutex_lock(&impl->mtx);
1398 #endif
1399  int post_read = !impl->read_posted && !sllist_empty(&impl->read_queue)
1400  && !impl->shutdown;
1401  if (post_read)
1402  impl->read_posted = 1;
1403 #if !LELY_NO_THREADS
1404  pthread_mutex_unlock(&impl->mtx);
1405 #endif
1406  if (post_read)
1407  ev_exec_post(impl->read_task.exec, &impl->read_task);
1408 }
1409 
1410 static void
1411 io_can_chan_impl_do_pop(struct io_can_chan_impl *impl,
1412  struct sllist *read_queue, struct sllist *write_queue,
1413  struct sllist *confirm_queue, struct ev_task *task)
1414 {
1415  assert(impl);
1416  assert(read_queue);
1417  assert(write_queue);
1418  assert(confirm_queue);
1419 
1420  if (!task) {
1424  } else if (sllist_remove(&impl->read_queue, &task->_node)) {
1425  sllist_push_back(read_queue, &task->_node);
1426  } else if (sllist_remove(&impl->write_queue, &task->_node)) {
1427  sllist_push_back(write_queue, &task->_node);
1428  } else if (sllist_remove(&impl->confirm_queue, &task->_node)) {
1429  sllist_push_back(confirm_queue, &task->_node);
1430  }
1431 }
1432 
1433 static void
1434 io_can_chan_impl_do_read(struct io_can_chan_impl *impl, struct sllist *queue,
1435  int *pwouldblock)
1436 {
1437  assert(impl);
1438  assert(queue);
1439 
1440  int errsv = errno;
1441 
1442  int wouldblock = 0;
1443 
1444  struct slnode *node;
1445  while ((node = sllist_first(&impl->read_queue))) {
1446  struct ev_task *task = ev_task_from_node(node);
1447  struct io_can_chan_read *read =
1449 
1450 #if !LELY_NO_THREADS
1451  pthread_mutex_lock(&impl->c_mtx);
1452 #endif
1453 
1454  // Check if a frame is available in the receive queue.
1455  size_t n = 1;
1456  size_t i = spscring_c_alloc(&impl->rxring, &n);
1457  if (!n) {
1458 #if !LELY_NO_THREADS
1459  pthread_mutex_unlock(&impl->c_mtx);
1460 #endif
1461  wouldblock = 1;
1462  break;
1463  }
1464 
1465  // Copy the frame from the buffer.
1466  struct io_can_frame *frame = &impl->rxbuf[i];
1467  void *data = &frame->frame;
1468  int is_err = can_frame2can_err(data, read->err);
1469  if (!is_err && read->msg) {
1470 #if !LELY_NO_CANFD
1471  if (frame->nbytes == CANFD_MTU)
1472  canfd_frame2can_msg(data, read->msg);
1473  else
1474 #endif
1475  can_frame2can_msg(data, read->msg);
1476  }
1477  if (read->tp)
1478  *read->tp = frame->ts;
1479  spscring_c_commit(&impl->rxring, 1);
1480 
1481 #if !LELY_NO_THREADS
1482  pthread_mutex_unlock(&impl->c_mtx);
1483 #endif
1484 
1485  read->r.result = !is_err;
1486  read->r.errc = 0;
1487 
1488  sllist_pop_front(&impl->read_queue);
1489  sllist_push_back(queue, node);
1490  }
1491 
1492  if (pwouldblock)
1493  *pwouldblock = wouldblock;
1494 
1495  errno = errsv;
1496 }
1497 
1498 static void
1499 io_can_chan_impl_do_confirm(struct io_can_chan_impl *impl, struct sllist *queue,
1500  const struct can_msg *msg)
1501 {
1502  assert(impl);
1503  assert(queue);
1504  assert(msg);
1505 
1506  // Find the matching write operation.
1507  struct slnode *node = sllist_first(&impl->confirm_queue);
1508  while (node) {
1510  ev_task_from_node(node));
1511  if (!can_msg_cmp(msg, write->msg))
1512  break;
1513  node = node->next;
1514  }
1515  if (!node)
1516  return;
1517 
1518  // Complete the matching write operation. Any preceding write operations
1519  // waiting for confirmation are considered to have failed.
1520  struct ev_task *task;
1521  while ((task = ev_task_from_node(
1522  sllist_pop_front(&impl->confirm_queue)))) {
1523  sllist_push_front(queue, &task->_node);
1524  struct io_can_chan_write *write =
1526  if (&task->_node == node) {
1527  write->errc = 0;
1528  break;
1529  } else {
1530  write->errc = EIO;
1531  }
1532  }
1533 }
1534 
1535 static size_t
1536 io_can_chan_impl_do_abort_tasks(struct io_can_chan_impl *impl)
1537 {
1538  assert(impl);
1539 
1540  size_t n = 0;
1541 
1542  // Try to abort io_can_chan_impl_rxbuf_task_func().
1543  // clang-format off
1544  if (impl->rxbuf_posted && ev_exec_abort(impl->rxbuf_task.exec,
1545  &impl->rxbuf_task)) {
1546  // clang-format on
1547  impl->rxbuf_posted = 0;
1548  n++;
1549  }
1550 
1551  // Try to abort io_can_chan_impl_read_task_func().
1552  // clang-format off
1553  if (impl->read_posted && ev_exec_abort(impl->read_task.exec,
1554  &impl->read_task)) {
1555  // clang-format on
1556  impl->read_posted = 0;
1557  n++;
1558  }
1559 
1560  // Try to abort io_can_chan_impl_write_task_func().
1561  // clang-format off
1562  if (impl->write_posted && ev_exec_abort(impl->write_task.exec,
1563  &impl->write_task)) {
1564  // clang-format on
1565  impl->write_posted = 0;
1566  n++;
1567  }
1568 
1569  return n;
1570 }
1571 
1572 static int
1573 io_can_chan_impl_set_fd(struct io_can_chan_impl *impl, int fd, int flags)
1574 {
1575  assert(impl);
1576  assert(!(flags & ~IO_CAN_BUS_FLAG_MASK));
1577 
1578  struct sllist read_queue, write_queue, confirm_queue;
1579  sllist_init(&read_queue);
1580  sllist_init(&write_queue);
1581  sllist_init(&confirm_queue);
1582 
1583 #if !LELY_NO_THREADS
1584  pthread_mutex_lock(&impl->mtx);
1585 #endif
1586 
1587  if (impl->events) {
1588  impl->events = 0;
1589  // Stop monitoring I/O events.
1590  io_poll_watch(impl->poll, impl->fd, impl->events, &impl->watch);
1591  }
1592 
1593 #if !LELY_NO_THREADS
1594  pthread_mutex_lock(&impl->c_mtx);
1595 #endif
1596  spscring_c_abort_wait(&impl->rxring);
1597  // Clear the receive queue.
1598  size_t n = SIZE_MAX;
1599  spscring_c_alloc(&impl->rxring, &n);
1600  if (n)
1601  spscring_c_commit(&impl->rxring, n);
1602 #if !LELY_NO_THREADS
1603  pthread_mutex_unlock(&impl->c_mtx);
1604 #endif
1605 
1606  int tmp = impl->fd;
1607  impl->fd = fd;
1608  fd = tmp;
1609 
1610  impl->flags = flags;
1611 
1612  // Cancel pending operations.
1613  sllist_append(&read_queue, &impl->read_queue);
1614  sllist_append(&write_queue, &impl->write_queue);
1615  sllist_append(&confirm_queue, &impl->confirm_queue);
1616 
1617  // Mark the ongoing write operation as canceled, if necessary.
1618  impl->current_write = NULL;
1619 
1620 #if !LELY_NO_THREADS
1621  pthread_mutex_unlock(&impl->mtx);
1622 #endif
1623 
1624  io_can_chan_read_queue_post(&read_queue, -1, ECANCELED);
1625  io_can_chan_write_queue_post(&write_queue, ECANCELED);
1626  io_can_chan_write_queue_post(&confirm_queue, ECANCELED);
1627 
1628  return fd;
1629 }
1630 
1631 #endif // __linux__
@ CAN_FLAG_FDF
The FD Format (FDF) flag, formerly known as Extended Data Length (EDL).
Definition: msg.h:54
@ CAN_FLAG_BRS
The Bit Rate Switch (BRS) flag (only available in CAN FD format frames).
Definition: msg.h:62
This is the internal header file of the SocketCAN rtnetlink attributes functions.
void io_can_chan_destroy(io_can_chan_t *chan)
Destroys a CAN channel.
Definition: can_chan.c:405
int io_can_chan_get_handle(const io_can_chan_t *chan)
Returns the SocketCAN file descriptor associated with a CAN channel, or -1 if the channel is closed.
Definition: can_chan.c:414
int io_can_chan_close(io_can_chan_t *chan)
Closes the SocketCAN file descriptor associated with a CAN channel.
Definition: can_chan.c:571
#define LELY_IO_CAN_RXLEN
The default SocketCAN receive queue length (in number of CAN frames).
Definition: can_chan.c:57
int io_can_chan_open(io_can_chan_t *chan, const io_can_ctrl_t *ctrl, int flags)
Opens a CAN channel.
Definition: can_chan.c:429
int io_can_chan_release(io_can_chan_t *chan)
Dissociates and returns the SocketCAN file descriptor from a CAN channel.
Definition: can_chan.c:557
int io_can_chan_assign(io_can_chan_t *chan, int fd)
Assigns an existing SocketCAN file descriptor to a CAN channel.
Definition: can_chan.c:499
io_can_chan_t * io_can_chan_create(io_poll_t *poll, ev_exec_t *exec, size_t rxlen)
Creates a new CAN channel.
Definition: can_chan.c:378
int io_can_chan_is_open(const io_can_chan_t *chan)
Returns 1 if the CAN channel is open and 0 if not.
Definition: can_chan.c:565
This is the internal header file of the SocketCAN error frame conversion functions.
This is the internal header file of the SocketCAN CAN frame conversion functions.
This header file is part of the I/O library; it contains the I/O context and service declarations.
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:121
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:136
This header file is part of the utilities library; it contains the diagnostic declarations.
@ DIAG_WARNING
A warning.
Definition: diag.h:47
void diag(enum diag_severity severity, int errc, const char *format,...)
Emits a diagnostic message.
Definition: diag.c:156
@ IO_EVENT_IN
Data (other than priority data) MAY be read without blocking.
Definition: event.h:35
@ IO_EVENT_OUT
Data (bot normal and priority data) MAY be written without blocking.
Definition: event.h:46
@ IO_EVENT_ERR
An error has occurred. This event is always reported.
Definition: event.h:48
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:138
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:126
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:108
ssize_t io_fd_sendmsg(int fd, const struct msghdr *msg, int flags, int timeout)
Equivalent to POSIX sendmsg(fd, msg, flags | MSG_NOSIGNAL), except that if fd is non-blocking (or the...
Definition: fd.c:116
ssize_t io_fd_recvmsg(int fd, struct msghdr *msg, int flags, int timeout)
Equivalent to POSIX recvmsg(fd, msg, flags), except that if fd is non-blocking (or the implementation...
Definition: fd.c:80
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
struct io_can_chan_write * io_can_chan_write_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel write operation from a pointer to its completion task.
Definition: can.c:97
const struct io_can_chan_vtbl *const io_can_chan_t
An abstract CAN channel.
Definition: can.h:59
struct io_can_chan_read * io_can_chan_read_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel read operation from a pointer to its completion task.
Definition: can.c:91
const struct io_can_ctrl_vtbl *const io_can_ctrl_t
An abstract CAN controller.
Definition: can.h:56
@ IO_CAN_BUS_FLAG_BRS
Bit Rate Switch support is enabled.
Definition: can.h:44
@ IO_CAN_BUS_FLAG_FDF
FD Format (formerly Extended Data Length) support is enabled.
Definition: can.h:42
@ IO_CAN_BUS_FLAG_ERR
Reception of error frames is enabled.
Definition: can.h:39
#define LELY_IO_RX_TIMEOUT
The default timeout (in milliseconds) for I/O read operations.
Definition: io2.h:31
#define LELY_IO_TX_TIMEOUT
The default timeout (in milliseconds) for I/O write operations.
Definition: io2.h:36
This header file is part of the I/O library; it contains the CAN bus declarations for Linux.
unsigned int io_can_ctrl_get_index(const io_can_ctrl_t *ctrl)
Returns the interface index of a CAN controller.
Definition: can_ctrl.c:182
int io_can_ctrl_get_flags(const io_can_ctrl_t *ctrl)
Returns the flags specifying which CAN bus features are enabled.
Definition: can_ctrl.c:190
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
int can_msg_cmp(const void *p1, const void *p2)
Compares two CAN or CAN FD format frames.
Definition: msg.c:30
const struct io_dev_vtbl *const io_dev_t
An abstract I/O device.
Definition: dev.h:35
This header file is part of the I/O library; it contains the I/O polling declarations for POSIX platf...
io_ctx_t * io_poll_get_ctx(const io_poll_t *poll)
Returns a pointer to the I/O context with which the I/O polling instance is registered.
Definition: poll.c:275
#define IO_POLL_WATCH_INIT(func)
The static initializer for io_poll_watch.
Definition: poll.h:65
int io_poll_watch(io_poll_t *poll, io_handle_t handle, struct io_event *event, int keep)
Registers an I/O device with an I/O polling interface and instructs it to watch for certain events.
Definition: poll.c:249
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:184
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:233
void sllist_push_front(struct sllist *list, struct slnode *node)
Pushes a node to the front of a singly-linked list.
Definition: sllist.h:205
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:213
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:190
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:221
struct slnode * sllist_first(const struct sllist *list)
Returns a pointer to the first node in a singly-linked list.
Definition: sllist.h:244
int can_msg2canfd_frame(const struct can_msg *src, struct canfd_frame *dst)
Converts a can_msg frame to a SocketCAN CAN FD frame.
Definition: can_msg.h:141
int can_frame2can_msg(const struct can_frame *src, struct can_msg *dst)
Converts a SocketCAN CAN frame to a can_msg frame.
Definition: can_msg.h:51
int canfd_frame2can_msg(const struct canfd_frame *src, struct can_msg *dst)
Converts a SocketCAN CAN FD frame to a can_msg frame.
Definition: can_msg.h:112
int can_msg2can_frame(const struct can_msg *src, struct can_frame *dst)
Converts a can_msg frame to a SocketCAN CAN frame.
Definition: can_msg.h:80
This header file is part of the utilities library; it contains the single-producer,...
size_t spscring_c_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:262
int spscring_c_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:327
size_t spscring_p_capacity(struct spscring *ring)
Returns the total capacity available for a producer in a single-producer single-consumer ring buffer,...
Definition: spscring.c:65
size_t spscring_p_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a consumer and, if this satisfies a wait operation...
Definition: spscring.c:138
size_t spscring_c_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a producer and, if this satisfies a wait operation...
Definition: spscring.c:302
int spscring_c_abort_wait(struct spscring *ring)
Aborts a wait operation previously registered with spscring_c_submit_wait().
Definition: spscring.c:368
void spscring_init(struct spscring *ring, size_t size)
Initializes a single-producer, single-consumer ring buffer with the specified size.
Definition: spscring.c:47
size_t spscring_p_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:98
This is the internal header file of the Windows-specific I/O declarations.
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
An I/O polling interface.
Definition: poll.c:48
A CAN error frame.
Definition: err.h:28
A CAN or CAN FD format frame.
Definition: msg.h:87
uint_least8_t flags
The flags (any combination of CAN_FLAG_IDE, CAN_FLAG_RTR, CAN_FLAG_FDF, CAN_FLAG_BRS and CAN_FLAG_ESI...
Definition: msg.h:94
An executable task.
Definition: task.h:41
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
The implementation of a CAN channel.
Definition: can_chan.c:133
struct ev_task read_task
The task responsible for initiating read operations.
Definition: can_chan.c:155
int fd
The SocketCAN file descriptor.
Definition: can_chan.c:174
struct ev_task write_task
The task responsible for initiating write operations.
Definition: can_chan.c:157
struct io_can_frame * rxbuf
The receive queue.
Definition: can_chan.c:165
struct io_svc svc
The I/O service representing the channel.
Definition: can_chan.c:145
struct sllist read_queue
The queue containing pending read operations.
Definition: can_chan.c:188
struct ev_task rxbuf_task
The task responsible for filling the receive queue.
Definition: can_chan.c:153
io_poll_t * poll
A pointer to the polling instance used to watch for I/O events.
Definition: can_chan.c:143
unsigned write_posted
A flag indicating whether write_task has been posted to exec.
Definition: can_chan.c:186
int flags
The flags with which fd has been opened.
Definition: can_chan.c:176
unsigned read_posted
A flag indicating whether read_task has been posted to exec.
Definition: can_chan.c:184
pthread_mutex_t mtx
The mutex protecting the file descriptor, the flags and the queues of pending operations.
Definition: can_chan.c:171
struct sllist write_queue
The queue containing pending write operations.
Definition: can_chan.c:190
unsigned shutdown
A flag indicating whether the I/O service has been shut down.
Definition: can_chan.c:180
ev_exec_t * exec
A pointer to the executor used to execute all I/O tasks.
Definition: can_chan.c:149
const struct io_dev_vtbl * dev_vptr
A pointer to the virtual table for the I/O device interface.
Definition: can_chan.c:135
struct sllist confirm_queue
The queue containing write operations waiting to be confirmed.
Definition: can_chan.c:192
struct ev_task * current_write
The write operation currently being executed.
Definition: can_chan.c:194
pthread_mutex_t c_mtx
The mutex protecting the receive queue consumer.
Definition: can_chan.c:160
struct io_poll_watch watch
The object used to monitor the file descriptor for I/O events.
Definition: can_chan.c:151
const struct io_can_chan_vtbl * chan_vptr
A pointer to the virtual table for the CAN channel interface.
Definition: can_chan.c:137
io_ctx_t * ctx
A pointer to the I/O context with which the channel is registered.
Definition: can_chan.c:147
int events
The I/O events currently being monitored by poll for fd.
Definition: can_chan.c:178
struct spscring rxring
The ring buffer used to control the receive queue.
Definition: can_chan.c:163
unsigned rxbuf_posted
A flag indicating whether rxbuf_task has been posted to exec.
Definition: can_chan.c:182
int result
The result of the read operation: 1 if a CAN frame is received, 0 if an error frame is received,...
Definition: can.h:68
int errc
The error number, obtained as if by get_errc(), if result is -1.
Definition: can.h:70
A CAN channel read operation.
Definition: can.h:74
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition: can.h:98
struct io_can_chan_read_result r
The result of the read operation.
Definition: can.h:100
struct timespec * tp
The address at which to store the system time at which the CAN frame or CAN error frame was received.
Definition: can.h:93
struct can_msg * msg
The address at which to store the CAN frame.
Definition: can.h:80
struct can_err * err
The address at which to store the CAN error frame.
Definition: can.h:86
A CAN channel write operation.
Definition: can.h:110
const struct can_msg * msg
A pointer to the CAN frame to be written.
Definition: can.h:116
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the write operation.
Definition: can.h:121
int errc
The error number, obtained as if by get_errc(), if an error occurred or the operation was canceled.
Definition: can.h:126
Definition: ctx.c:35
An object representing a file descriptor being monitored for I/O events.
Definition: poll.h:56
The virtual table of an I/O service.
Definition: ctx.h:67
An I/O service.
Definition: ctx.h:49
A singly-linked list.
Definition: sllist.h:51
A node in a singly-linked list.
Definition: sllist.h:39
struct slnode * next
A pointer to the next node in the list.
Definition: sllist.h:41
A single-producer, single-consumer ring buffer.
Definition: spscring.h:61
size_t ev_task_queue_abort(struct sllist *queue)
Aborts the tasks in queue by invoking ev_exec_on_task_fini() for each of them.
Definition: task.c:55
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
size_t ev_task_queue_post(struct sllist *queue)
Post the tasks in queue to their respective executors and invokes ev_exec_on_task_fini() for each of ...
Definition: task.c:38
This header file is part of the C11 and POSIX compatibility library; it includes <unistd....
This header file is part of the utilities library; it contains the time function declarations.