Lely core libraries  2.3.4
can.c
Go to the documentation of this file.
1 
24 #include "../can.h"
25 
26 #if !LELY_NO_MALLOC
27 
28 #if !LELY_NO_THREADS
29 #include <lely/libc/threads.h>
30 #endif
31 #include <lely/io2/ctx.h>
32 #include <lely/io2/user/can.h>
33 #include <lely/util/diag.h>
34 #include <lely/util/errnum.h>
35 #include <lely/util/spscring.h>
36 #include <lely/util/time.h>
37 #include <lely/util/util.h>
38 
39 #include <assert.h>
40 #include <stdlib.h>
41 
42 #ifndef LELY_IO_USER_CAN_RXLEN
47 #define LELY_IO_USER_CAN_RXLEN 1024
48 #endif
49 
51  int is_err;
52  union {
53  struct can_msg msg;
54  struct can_err err;
55  } u;
56  struct timespec ts;
57 };
58 
59 static io_ctx_t *io_user_can_chan_dev_get_ctx(const io_dev_t *dev);
60 static ev_exec_t *io_user_can_chan_dev_get_exec(const io_dev_t *dev);
61 static size_t io_user_can_chan_dev_cancel(io_dev_t *dev, struct ev_task *task);
62 static size_t io_user_can_chan_dev_abort(io_dev_t *dev, struct ev_task *task);
63 
64 // clang-format off
65 static const struct io_dev_vtbl io_user_can_chan_dev_vtbl = {
66  &io_user_can_chan_dev_get_ctx,
67  &io_user_can_chan_dev_get_exec,
68  &io_user_can_chan_dev_cancel,
69  &io_user_can_chan_dev_abort
70 };
71 // clang-format on
72 
73 static io_dev_t *io_user_can_chan_get_dev(const io_can_chan_t *chan);
74 static int io_user_can_chan_get_flags(const io_can_chan_t *chan);
75 static int io_user_can_chan_read(io_can_chan_t *chan, struct can_msg *msg,
76  struct can_err *err, struct timespec *tp, int timeout);
77 static void io_user_can_chan_submit_read(
78  io_can_chan_t *chan, struct io_can_chan_read *read);
79 static int io_user_can_chan_write(
80  io_can_chan_t *chan, const struct can_msg *msg, int timeout);
81 static void io_user_can_chan_submit_write(
82  io_can_chan_t *chan, struct io_can_chan_write *write);
83 
84 // clang-format off
85 static const struct io_can_chan_vtbl io_user_can_chan_vtbl = {
86  &io_user_can_chan_get_dev,
87  &io_user_can_chan_get_flags,
88  &io_user_can_chan_read,
89  &io_user_can_chan_submit_read,
90  &io_user_can_chan_write,
91  &io_user_can_chan_submit_write
92 };
93 // clang-format on
94 
95 static void io_user_can_chan_svc_shutdown(struct io_svc *svc);
96 
97 // clang-format off
98 static const struct io_svc_vtbl io_user_can_chan_svc_vtbl = {
99  NULL,
100  &io_user_can_chan_svc_shutdown
101 };
102 // clang-format on
103 
107  const struct io_dev_vtbl *dev_vptr;
111  struct io_svc svc;
117  int flags;
122  int txtimeo;
131  void *arg;
133  struct ev_task read_task;
135  struct ev_task write_task;
136 #if !LELY_NO_THREADS
145 #endif
147  struct spscring rxring;
150 #if !LELY_NO_THREADS
156 #endif
158  unsigned shutdown : 1;
160  unsigned read_posted : 1;
162  unsigned write_posted : 1;
164  struct sllist read_queue;
166  struct sllist write_queue;
171 };
172 
173 static void io_user_can_chan_read_task_func(struct ev_task *task);
174 static void io_user_can_chan_write_task_func(struct ev_task *task);
175 
176 static inline struct io_user_can_chan *io_user_can_chan_from_dev(
177  const io_dev_t *dev);
178 static inline struct io_user_can_chan *io_user_can_chan_from_chan(
179  const io_can_chan_t *chan);
180 static inline struct io_user_can_chan *io_user_can_chan_from_svc(
181  const struct io_svc *svc);
182 
183 static int io_user_can_chan_on_frame(struct io_user_can_chan *user,
184  const struct io_user_can_frame *frame, int timeout);
185 
186 static void io_user_can_chan_p_signal(struct spscring *ring, void *arg);
187 static void io_user_can_chan_c_signal(struct spscring *ring, void *arg);
188 
189 static void io_user_can_chan_do_pop(struct io_user_can_chan *user,
190  struct sllist *read_queue, struct sllist *write_queue,
191  struct ev_task *task);
192 
193 static size_t io_user_can_chan_do_abort_tasks(struct io_user_can_chan *user);
194 
195 void *
196 io_user_can_chan_alloc(void)
197 {
198  struct io_user_can_chan *user = malloc(sizeof(*user));
199  if (!user) {
200 #if !LELY_NO_ERRNO
201  set_errc(errno2c(errno));
202 #endif
203  return NULL;
204  }
205  // Suppress a GCC maybe-uninitialized warning.
206  user->chan_vptr = NULL;
207  // cppcheck-suppress memleak symbolName=user
208  return &user->chan_vptr;
209 }
210 
211 void
212 io_user_can_chan_free(void *ptr)
213 {
214  if (ptr)
215  free(io_user_can_chan_from_chan(ptr));
216 }
217 
219 io_user_can_chan_init(io_can_chan_t *chan, io_ctx_t *ctx, ev_exec_t *exec,
220  int flags, size_t rxlen, int txtimeo,
222 {
223  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
224  assert(ctx);
225  assert(exec);
226 
227  if (flags & ~IO_CAN_BUS_FLAG_MASK) {
229  return NULL;
230  }
231 
232  if (!rxlen)
233  rxlen = LELY_IO_USER_CAN_RXLEN;
234 
235  if (!txtimeo)
237 
238  int errc = 0;
239 
240  user->dev_vptr = &io_user_can_chan_dev_vtbl;
241  user->chan_vptr = &io_user_can_chan_vtbl;
242 
243  user->svc = (struct io_svc)IO_SVC_INIT(&io_user_can_chan_svc_vtbl);
244  user->ctx = ctx;
245 
246  user->exec = exec;
247 
248  user->flags = flags;
249  user->txtimeo = txtimeo;
250 
251  user->func = func;
252  user->arg = arg;
253 
254  user->read_task = (struct ev_task)EV_TASK_INIT(
255  user->exec, &io_user_can_chan_read_task_func);
256  user->write_task = (struct ev_task)EV_TASK_INIT(
257  user->exec, &io_user_can_chan_write_task_func);
258 
259 #if !LELY_NO_THREADS
260  if (mtx_init(&user->p_mtx, mtx_plain) != thrd_success) {
261  errc = get_errc();
262  goto error_init_p_mtx;
263  }
264 
265  if (cnd_init(&user->p_cond) != thrd_success) {
266  errc = get_errc();
267  goto error_init_p_cond;
268  }
269 
270  if (mtx_init(&user->c_mtx, mtx_plain) != thrd_success) {
271  errc = get_errc();
272  goto error_init_c_mtx;
273  }
274 
275  if (cnd_init(&user->c_cond) != thrd_success) {
276  errc = get_errc();
277  goto error_init_c_cond;
278  }
279 #endif
280 
281  spscring_init(&user->rxring, rxlen);
282  user->rxbuf = calloc(rxlen, sizeof(struct io_user_can_frame));
283  if (!user->rxbuf) {
284 #if !LELY_NO_ERRNO
285  errc = errno2c(errno);
286 #endif
287  goto error_alloc_rxbuf;
288  }
289 
290 #if !LELY_NO_THREADS
291  if (mtx_init(&user->mtx, mtx_plain) != thrd_success) {
292  errc = get_errc();
293  goto error_init_mtx;
294  }
295 #endif
296 
297  user->shutdown = 0;
298  user->read_posted = 0;
299  user->write_posted = 0;
300 
301  sllist_init(&user->read_queue);
302  sllist_init(&user->write_queue);
303  user->current_read = NULL;
304  user->current_write = NULL;
305 
306  io_ctx_insert(user->ctx, &user->svc);
307 
308  return chan;
309 
310 #if !LELY_NO_THREADS
311  // mtx_destroy(&user->mtx);
312 error_init_mtx:
313 #endif
314  free(user->rxbuf);
315 error_alloc_rxbuf:
316 #if !LELY_NO_THREADS
317  cnd_destroy(&user->c_cond);
318 error_init_c_cond:
319  mtx_destroy(&user->c_mtx);
320 error_init_c_mtx:
321  cnd_destroy(&user->p_cond);
322 error_init_p_cond:
323  mtx_destroy(&user->p_mtx);
324 error_init_p_mtx:
325 #endif
326  set_errc(errc);
327  return NULL;
328 }
329 
330 void
331 io_user_can_chan_fini(io_can_chan_t *chan)
332 {
333  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
334 
335  io_ctx_remove(user->ctx, &user->svc);
336  // Cancel all pending tasks.
337  io_user_can_chan_svc_shutdown(&user->svc);
338 
339  // Abort any consumer wait operation running in a task. Producer wait
340  // operations are only initiated by io_user_can_chan_on_msg() and
341  // io_user_can_chan_on_err(), and should have terminated before this
342  // function is called.
344 
345 #if !LELY_NO_THREADS
346  int warning = 0;
347  mtx_lock(&user->mtx);
348  // If necessary, busy-wait until io_user_can_chan_read_task_func() and
349  // io_user_can_chan_write_task_func() complete.
350  while (user->read_posted || user->write_posted) {
351  if (io_user_can_chan_do_abort_tasks(user))
352  continue;
353  mtx_unlock(&user->mtx);
354  if (!warning) {
355  warning = 1;
356  diag(DIAG_WARNING, 0,
357  "io_user_can_chan_fini() invoked with pending operations");
358  }
359  thrd_yield();
360  mtx_lock(&user->mtx);
361  }
362  mtx_unlock(&user->mtx);
363 
364  mtx_destroy(&user->mtx);
365 #endif
366 
367  free(user->rxbuf);
368 
369 #if !LELY_NO_THREADS
370  cnd_destroy(&user->c_cond);
371  mtx_destroy(&user->c_mtx);
372  cnd_destroy(&user->p_cond);
373  mtx_destroy(&user->p_mtx);
374 #endif
375 }
376 
380 {
381  int errc = 0;
382 
383  io_can_chan_t *chan = io_user_can_chan_alloc();
384  if (!chan) {
385  errc = get_errc();
386  goto error_alloc;
387  }
388 
389  io_can_chan_t *tmp = io_user_can_chan_init(
390  chan, ctx, exec, flags, rxlen, txtimeo, func, arg);
391  if (!tmp) {
392  errc = get_errc();
393  goto error_init;
394  }
395  chan = tmp;
396 
397  return chan;
398 
399 error_init:
400  io_user_can_chan_free((void *)chan);
401 error_alloc:
402  set_errc(errc);
403  return NULL;
404 }
405 
406 void
408 {
409  if (chan) {
410  io_user_can_chan_fini(chan);
411  io_user_can_chan_free((void *)chan);
412  }
413 }
414 
415 int
417  const struct timespec *tp, int timeout)
418 {
419  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
420  assert(msg);
421 
422 #if !LELY_NO_CANFD
423  int flags = 0;
424  if (msg->flags & CAN_FLAG_FDF)
426  if (msg->flags & CAN_FLAG_BRS)
428  if ((flags & user->flags) != flags) {
430  return -1;
431  }
432 #endif
433 
434  struct io_user_can_frame frame = { .is_err = 0,
435  .u.msg = *msg,
436  .ts = tp ? *tp : (struct timespec){ 0, 0 } };
437  return io_user_can_chan_on_frame(user, &frame, timeout);
438 }
439 
440 int
442  const struct timespec *tp, int timeout)
443 {
444  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
445  assert(err);
446 
447  if (!(user->flags & IO_CAN_BUS_FLAG_ERR)) {
449  return -1;
450  }
451 
452  struct io_user_can_frame frame = { .is_err = 1,
453  .u.err = *err,
454  .ts = tp ? *tp : (struct timespec){ 0, 0 } };
455  return io_user_can_chan_on_frame(user, &frame, timeout);
456 }
457 
458 static io_ctx_t *
459 io_user_can_chan_dev_get_ctx(const io_dev_t *dev)
460 {
461  const struct io_user_can_chan *user = io_user_can_chan_from_dev(dev);
462 
463  return user->ctx;
464 }
465 
466 static ev_exec_t *
467 io_user_can_chan_dev_get_exec(const io_dev_t *dev)
468 {
469  const struct io_user_can_chan *user = io_user_can_chan_from_dev(dev);
470 
471  return user->exec;
472 }
473 
474 static size_t
475 io_user_can_chan_dev_cancel(io_dev_t *dev, struct ev_task *task)
476 {
477  struct io_user_can_chan *user = io_user_can_chan_from_dev(dev);
478 
479  size_t n = 0;
480 
481  struct sllist read_queue, write_queue;
482  sllist_init(&read_queue);
483  sllist_init(&write_queue);
484 
485 #if !LELY_NO_THREADS
486  mtx_lock(&user->mtx);
487 #endif
488  if (user->current_read && (!task || task == user->current_read)) {
489  user->current_read = NULL;
490  n++;
491  }
492  if (user->current_write && (!task || task == user->current_write)) {
493  user->current_write = NULL;
494  n++;
495  }
496  io_user_can_chan_do_pop(user, &read_queue, &write_queue, task);
497 #if !LELY_NO_THREADS
498  mtx_unlock(&user->mtx);
499 #endif
500 
501  size_t nread = io_can_chan_read_queue_post(
502  &read_queue, -1, errnum2c(ERRNUM_CANCELED));
503  n = n < SIZE_MAX - nread ? n + nread : SIZE_MAX;
504  size_t nwrite = io_can_chan_write_queue_post(
505  &write_queue, errnum2c(ERRNUM_CANCELED));
506  n = n < SIZE_MAX - nwrite ? n + nwrite : SIZE_MAX;
507 
508  return n;
509 }
510 
511 static size_t
512 io_user_can_chan_dev_abort(io_dev_t *dev, struct ev_task *task)
513 {
514  struct io_user_can_chan *user = io_user_can_chan_from_dev(dev);
515 
516  struct sllist queue;
517  sllist_init(&queue);
518 
519 #if !LELY_NO_THREADS
520  mtx_lock(&user->mtx);
521 #endif
522  io_user_can_chan_do_pop(user, &queue, &queue, task);
523 #if !LELY_NO_THREADS
524  mtx_unlock(&user->mtx);
525 #endif
526 
527  return ev_task_queue_abort(&queue);
528 }
529 
530 static io_dev_t *
531 io_user_can_chan_get_dev(const io_can_chan_t *chan)
532 {
533  const struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
534 
535  return &user->dev_vptr;
536 }
537 
538 static int
539 io_user_can_chan_get_flags(const io_can_chan_t *chan)
540 {
541  const struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
542 
543  return user->flags;
544 }
545 
546 static int
547 io_user_can_chan_read(io_can_chan_t *chan, struct can_msg *msg,
548  struct can_err *err, struct timespec *tp, int timeout)
549 {
550  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
551 
552 #if !LELY_NO_THREADS
553  // Compute the absolute timeout for cnd_timedwait().
554  struct timespec ts = { 0, 0 };
555  if (timeout > 0) {
556 #if LELY_NO_TIMEOUT
557  timeout = 0;
558  (void)ts;
559 #else
560  if (!timespec_get(&ts, TIME_UTC))
561  return -1;
562  timespec_add_msec(&ts, timeout);
563 #endif
564  }
565 #endif
566 
567 #if !LELY_NO_THREADS
568  mtx_lock(&user->c_mtx);
569 #endif
570  size_t i = 0;
571  for (;;) {
572  // Check if a frame is available in the receive queue.
573  size_t n = 1;
574  i = spscring_c_alloc(&user->rxring, &n);
575  if (n)
576  break;
577  if (!timeout) {
578 #if !LELY_NO_THREADS
579  mtx_unlock(&user->c_mtx);
580 #endif
582  return -1;
583  }
584  // Submit a wait operation for a single frame.
585  // clang-format off
586  if (!spscring_c_submit_wait(&user->rxring, 1,
587  &io_user_can_chan_c_signal, user))
588  // clang-format on
589  // If the wait condition was already satisfied, try
590  // again.
591  continue;
592  // Wait for the buffer to signal that a frame is
593  // available, or time out if that takes too long.
594 #if !LELY_NO_THREADS
595  int result;
596 #if !LELY_NO_TIMEOUT
597  if (timeout > 0)
598  result = cnd_timedwait(
599  &user->c_cond, &user->c_mtx, &ts);
600  else
601 #endif
602  result = cnd_wait(&user->c_cond, &user->c_mtx);
603  if (result != thrd_success) {
604  mtx_unlock(&user->c_mtx);
605 #if !LELY_NO_TIMEOUT
606  if (result == thrd_timedout)
608 #endif
609  return -1;
610  }
611 #endif
612  }
613  // Copy the frame from the buffer.
614  struct io_user_can_frame *frame = &user->rxbuf[i];
615  int is_err = frame->is_err;
616  if (!is_err && msg)
617  *msg = frame->u.msg;
618  else if (is_err && err)
619  *err = frame->u.err;
620  if (tp)
621  *tp = frame->ts;
622  spscring_c_commit(&user->rxring, 1);
623 #if !LELY_NO_THREADS
624  mtx_unlock(&user->c_mtx);
625 #endif
626 
627  return !is_err;
628 }
629 
630 static void
631 io_user_can_chan_submit_read(io_can_chan_t *chan, struct io_can_chan_read *read)
632 {
633  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
634  assert(read);
635  struct ev_task *task = &read->task;
636 
637  if (!task->exec)
638  task->exec = user->exec;
639  ev_exec_on_task_init(task->exec);
640 
641 #if !LELY_NO_THREADS
642  mtx_lock(&user->mtx);
643 #endif
644  if (user->shutdown) {
645 #if !LELY_NO_THREADS
646  mtx_unlock(&user->mtx);
647 #endif
648  io_can_chan_read_post(read, -1, errnum2c(ERRNUM_CANCELED));
649  } else {
650  int post_read = !user->read_posted
651  && sllist_empty(&user->read_queue);
652  sllist_push_back(&user->read_queue, &task->_node);
653  if (post_read)
654  user->read_posted = 1;
655 #if !LELY_NO_THREADS
656  mtx_unlock(&user->mtx);
657 #endif
658  // cppcheck-suppress duplicateCondition
659  if (post_read)
660  ev_exec_post(user->read_task.exec, &user->read_task);
661  }
662 }
663 
664 static int
665 io_user_can_chan_write(
666  io_can_chan_t *chan, const struct can_msg *msg, int timeout)
667 {
668  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
669  assert(msg);
670 
671 #if !LELY_NO_CANFD
672  int flags = 0;
673  if (msg->flags & CAN_FLAG_FDF)
675  if (msg->flags & CAN_FLAG_BRS)
677  if ((flags & user->flags) != flags) {
679  return -1;
680  }
681 #endif
682 
683  if (!user->func) {
685  return -1;
686  }
687 
688  return user->func(msg, timeout, user->arg);
689 }
690 
691 static void
692 io_user_can_chan_submit_write(
693  io_can_chan_t *chan, struct io_can_chan_write *write)
694 {
695  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
696  assert(write);
697  struct ev_task *task = &write->task;
698 
699 #if !LELY_NO_CANFD
700  int flags = 0;
701  if (write->msg->flags & CAN_FLAG_FDF)
702  flags |= IO_CAN_BUS_FLAG_FDF;
703  if (write->msg->flags & CAN_FLAG_BRS)
704  flags |= IO_CAN_BUS_FLAG_BRS;
705 #endif
706 
707  if (!task->exec)
708  task->exec = user->exec;
709  ev_exec_on_task_init(task->exec);
710 
711 #if !LELY_NO_THREADS
712  mtx_lock(&user->mtx);
713 #endif
714  if (user->shutdown) {
715 #if !LELY_NO_THREADS
716  mtx_unlock(&user->mtx);
717 #endif
718  io_can_chan_write_post(write, errnum2c(ERRNUM_CANCELED));
719 #if !LELY_NO_CANFD
720  } else if ((flags & user->flags) != flags) {
721 #if !LELY_NO_THREADS
722  mtx_unlock(&user->mtx);
723 #endif
724  io_can_chan_write_post(write, errnum2c(ERRNUM_INVAL));
725 #endif
726  } else if (!user->func) {
727 #if !LELY_NO_THREADS
728  mtx_unlock(&user->mtx);
729 #endif
730  io_can_chan_write_post(write, errnum2c(ERRNUM_NOSYS));
731  } else {
732  int post_write = !user->write_posted
733  && sllist_empty(&user->write_queue);
734  sllist_push_back(&user->write_queue, &task->_node);
735  if (post_write)
736  user->write_posted = 1;
737 #if !LELY_NO_THREADS
738  mtx_unlock(&user->mtx);
739 #endif
740  // cppcheck-suppress duplicateCondition
741  if (post_write)
742  ev_exec_post(user->write_task.exec, &user->write_task);
743  }
744 }
745 
746 static void
747 io_user_can_chan_svc_shutdown(struct io_svc *svc)
748 {
749  struct io_user_can_chan *user = io_user_can_chan_from_svc(svc);
750  io_dev_t *dev = &user->dev_vptr;
751 
752 #if !LELY_NO_THREADS
753  mtx_lock(&user->mtx);
754 #endif
755  int shutdown = !user->shutdown;
756  user->shutdown = 1;
757  if (shutdown)
758  // Try to abort io_user_can_chan_read_task_func() and
759  // io_user_can_chan_write_task_func().
760  io_user_can_chan_do_abort_tasks(user);
761 #if !LELY_NO_THREADS
762  mtx_unlock(&user->mtx);
763 #endif
764  // cppcheck-suppress duplicateCondition
765  if (shutdown)
766  // Cancel all pending operations.
767  io_user_can_chan_dev_cancel(dev, NULL);
768 }
769 
770 static void
771 io_user_can_chan_read_task_func(struct ev_task *task)
772 {
773  assert(task);
774  struct io_user_can_chan *user =
775  structof(task, struct io_user_can_chan, read_task);
776  io_can_chan_t *chan = &user->chan_vptr;
777 
778  int errsv = get_errc();
779 
780  int wouldblock = 0;
781 
782 #if !LELY_NO_THREADS
783  mtx_lock(&user->mtx);
784 #endif
785  // Try to process all pending read operations at once.
786  while ((task = user->current_read = ev_task_from_node(
787  sllist_pop_front(&user->read_queue)))) {
788 #if !LELY_NO_THREADS
789  mtx_unlock(&user->mtx);
790 #endif
791  struct io_can_chan_read *read =
793  int result = io_user_can_chan_read(
794  chan, read->msg, read->err, read->tp, 0);
795  int errc = result >= 0 ? 0 : get_errc();
796  wouldblock = errc == errnum2c(ERRNUM_AGAIN)
797  || errc == errnum2c(ERRNUM_WOULDBLOCK);
798  if (!wouldblock)
799  // The operation succeeded or failed immediately.
800  io_can_chan_read_post(read, result, errc);
801 #if !LELY_NO_THREADS
802  mtx_lock(&user->mtx);
803 #endif
804  if (task == user->current_read) {
805  // Put the read operation back on the queue if it would
806  // block, unless it was canceled.
807  if (wouldblock) {
809  &task->_node);
810  task = NULL;
811  }
812  user->current_read = NULL;
813  }
814  assert(!user->current_read);
815  // Stop if the operation did or would block.
816  if (wouldblock)
817  break;
818  }
819  // Repost this task if any read operations remain in the queue.
820  int post_read = !sllist_empty(&user->read_queue) && !user->shutdown;
821  // Register a wait operation if the receive queue is empty.
822  if (post_read && wouldblock) {
823 #if !LELY_NO_THREADS
824  mtx_lock(&user->c_mtx);
825 #endif
826  // Do not repost this task unless the wait condition can be
827  // satisfied immediately.
828  post_read = !spscring_c_submit_wait(&user->rxring, 1,
829  io_user_can_chan_c_signal, user);
830 #if !LELY_NO_THREADS
831  mtx_unlock(&user->c_mtx);
832 #endif
833  }
834  user->read_posted = post_read;
835 #if !LELY_NO_THREADS
836  mtx_unlock(&user->mtx);
837 #endif
838 
839  if (task && wouldblock)
840  // The operation would block but was canceled before it could be
841  // requeued.
842  io_can_chan_read_post(io_can_chan_read_from_task(task), -1,
844 
845  if (post_read)
846  ev_exec_post(user->read_task.exec, &user->read_task);
847 
848  set_errc(errsv);
849 }
850 
851 static void
852 io_user_can_chan_write_task_func(struct ev_task *task)
853 {
854  assert(task);
855  struct io_user_can_chan *user =
856  structof(task, struct io_user_can_chan, write_task);
857  io_can_chan_t *chan = &user->chan_vptr;
858 
859  int errsv = get_errc();
860 
861  int wouldblock = 0;
862 
863 #if !LELY_NO_THREADS
864  mtx_lock(&user->mtx);
865 #endif
866  // clang-format off
867  if ((task = user->current_write = ev_task_from_node(
868  sllist_pop_front(&user->write_queue)))) {
869  // clang-format on
870 #if !LELY_NO_THREADS
871  mtx_unlock(&user->mtx);
872 #endif
873  struct io_can_chan_write *write =
875  int result = io_user_can_chan_write(
876  chan, write->msg, user->txtimeo);
877  int errc = !result ? 0 : get_errc();
878  wouldblock = errc == errnum2c(ERRNUM_AGAIN)
880  if (!wouldblock)
881  // The operation succeeded or failed immediately.
882  io_can_chan_write_post(write, errc);
883 #if !LELY_NO_THREADS
884  mtx_lock(&user->mtx);
885 #endif
886  if (task == user->current_write) {
887  // Put the read operation back on the queue if it would
888  // block, unless it was canceled.
889  if (wouldblock) {
891  &task->_node);
892  task = NULL;
893  }
894  user->current_write = NULL;
895  }
896  assert(!user->current_write);
897  }
898  int post_write = user->write_posted =
899  !sllist_empty(&user->write_queue) && !user->shutdown;
900 #if !LELY_NO_THREADS
901  mtx_unlock(&user->mtx);
902 #endif
903 
904  if (task && wouldblock)
905  // The operation would block but was canceled before it could be
906  // requeued.
907  io_can_chan_write_post(io_can_chan_write_from_task(task),
909 
910  if (post_write)
911  ev_exec_post(user->write_task.exec, &user->write_task);
912 
913  set_errc(errsv);
914 }
915 
916 static inline struct io_user_can_chan *
917 io_user_can_chan_from_dev(const io_dev_t *dev)
918 {
919  assert(dev);
920 
921  return structof(dev, struct io_user_can_chan, dev_vptr);
922 }
923 
924 static inline struct io_user_can_chan *
925 io_user_can_chan_from_chan(const io_can_chan_t *chan)
926 {
927  assert(chan);
928 
929  return structof(chan, struct io_user_can_chan, chan_vptr);
930 }
931 
932 static inline struct io_user_can_chan *
933 io_user_can_chan_from_svc(const struct io_svc *svc)
934 {
935  assert(svc);
936 
937  return structof(svc, struct io_user_can_chan, svc);
938 }
939 
940 static int
941 io_user_can_chan_on_frame(struct io_user_can_chan *user,
942  const struct io_user_can_frame *frame, int timeout)
943 {
944  assert(user);
945  assert(frame);
946 
947 #if !LELY_NO_THREADS
948  // Compute the absolute timeout for cnd_timedwait().
949  struct timespec ts = { 0, 0 };
950  if (timeout > 0) {
951 #if LELY_NO_TIMEOUT
952  timeout = 0;
953  (void)ts;
954 #else
955  if (!timespec_get(&ts, TIME_UTC))
956  return -1;
957  timespec_add_msec(&ts, timeout);
958 #endif
959  }
960 #endif
961 
962 #if !LELY_NO_THREADS
963  mtx_lock(&user->p_mtx);
964 #endif
965  size_t i = 0;
966  for (;;) {
967  // Check if a slot is available in the receive queue.
968  size_t n = 1;
969  i = spscring_p_alloc(&user->rxring, &n);
970  if (n)
971  break;
972  if (!timeout) {
973 #if !LELY_NO_THREADS
974  mtx_unlock(&user->p_mtx);
975 #endif
977  return -1;
978  }
979  // clang-format off
980  if (!spscring_p_submit_wait(&user->rxring, 1,
981  &io_user_can_chan_p_signal, user))
982  // clang-format on
983  // If the wait condition was already satisfied, try
984  // again.
985  continue;
986 #if !LELY_NO_THREADS
987  int result;
988 #if !LELY_NO_TIMEOUT
989  // Wait for the buffer to signal that a slot is available, or
990  // time out if that takes too long.
991  if (timeout > 0)
992  result = cnd_timedwait(
993  &user->p_cond, &user->p_mtx, &ts);
994  else
995 #endif
996  result = cnd_wait(&user->p_cond, &user->p_mtx);
997  if (result != thrd_success) {
998  mtx_unlock(&user->p_mtx);
999 #if !LELY_NO_TIMEOUT
1000  if (result == thrd_timedout)
1002 #endif
1003  return -1;
1004  }
1005 #endif
1006  }
1007  // Copy the frame to the buffer.
1008  user->rxbuf[i] = *frame;
1009  spscring_p_commit(&user->rxring, 1);
1010 #if !LELY_NO_THREADS
1011  mtx_unlock(&user->p_mtx);
1012 #endif
1013 
1014  return 0;
1015 }
1016 
1017 static void
1018 io_user_can_chan_p_signal(struct spscring *ring, void *arg)
1019 {
1020  (void)ring;
1021 
1022 #if LELY_NO_THREADS
1023  (void)arg;
1024 #else
1025  struct io_user_can_chan *user = arg;
1026  assert(user);
1027 
1028  mtx_lock(&user->p_mtx);
1029  cnd_broadcast(&user->p_cond);
1030  mtx_unlock(&user->p_mtx);
1031 #endif
1032 }
1033 
1034 static void
1035 io_user_can_chan_c_signal(struct spscring *ring, void *arg)
1036 {
1037  (void)ring;
1038  struct io_user_can_chan *user = arg;
1039  assert(user);
1040 
1041 #if !LELY_NO_THREADS
1042  mtx_lock(&user->c_mtx);
1043  cnd_broadcast(&user->c_cond);
1044  mtx_unlock(&user->c_mtx);
1045 #endif
1046 
1047 #if !LELY_NO_THREADS
1048  mtx_lock(&user->mtx);
1049 #endif
1050  int post_read = !user->read_posted && !sllist_empty(&user->read_queue)
1051  && !user->shutdown;
1052  if (post_read)
1053  user->read_posted = 1;
1054 #if !LELY_NO_THREADS
1055  mtx_unlock(&user->mtx);
1056 #endif
1057  // cppcheck-suppress duplicateCondition
1058  if (post_read)
1059  ev_exec_post(user->read_task.exec, &user->read_task);
1060 }
1061 
1062 static void
1063 io_user_can_chan_do_pop(struct io_user_can_chan *user,
1064  struct sllist *read_queue, struct sllist *write_queue,
1065  struct ev_task *task)
1066 {
1067  assert(user);
1068  assert(read_queue);
1069  assert(write_queue);
1070 
1071  if (!task) {
1074  } else if (sllist_remove(&user->read_queue, &task->_node)) {
1075  sllist_push_back(read_queue, &task->_node);
1076  } else if (sllist_remove(&user->write_queue, &task->_node)) {
1077  sllist_push_back(write_queue, &task->_node);
1078  }
1079 }
1080 
1081 static size_t
1082 io_user_can_chan_do_abort_tasks(struct io_user_can_chan *user)
1083 {
1084  size_t n = 0;
1085 
1086  // Try to abort io_user_can_chan_read_task_func().
1087  // clang-format off
1088  if (user->read_posted && ev_exec_abort(user->read_task.exec,
1089  &user->read_task)) {
1090  // clang-format on
1091  user->read_posted = 0;
1092  n++;
1093  }
1094 
1095  // Try to abort io_user_can_chan_write_task_func().
1096  // clang-format off
1097  if (user->write_posted && ev_exec_abort(user->write_task.exec,
1098  &user->write_task)) {
1099  // clang-format on
1100  user->write_posted = 0;
1101  n++;
1102  }
1103 
1104  return n;
1105 }
1106 
1107 #endif // !LELY_NO_MALLOC
@ CAN_FLAG_FDF
The FD Format (FDF) flag, formerly known as Extended Data Length (EDL).
Definition: msg.h:54
@ CAN_FLAG_BRS
The Bit Rate Switch (BRS) flag (only available in CAN FD format frames).
Definition: msg.h:62
This header file is part of the I/O library; it contains the I/O context and service declarations.
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:126
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:141
This header file is part of the utilities library; it contains the diagnostic declarations.
@ DIAG_WARNING
A warning.
Definition: diag.h:55
void diag(enum diag_severity severity, int errc, const char *format,...)
Emits a diagnostic message.
Definition: diag.c:171
This header file is part of the utilities library; it contains the native and platform-independent er...
int errnum2c(errnum_t errnum)
Transforms a platform-independent error number to a native error code.
Definition: errnum.c:810
@ ERRNUM_NOSYS
Function not supported.
Definition: errnum.h:184
@ ERRNUM_WOULDBLOCK
Operation would block.
Definition: errnum.h:233
@ ERRNUM_INVAL
Invalid argument.
Definition: errnum.h:132
@ ERRNUM_AGAIN
Resource unavailable, try again.
Definition: errnum.h:89
@ ERRNUM_CANCELED
Operation canceled.
Definition: errnum.h:99
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition: errnum.c:932
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:944
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:46
void set_errnum(errnum_t errnum)
Sets the current (thread-specific) platform-independent error number to errnum.
Definition: errnum.h:424
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:136
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:124
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:106
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
const struct io_can_chan_vtbl *const io_can_chan_t
An abstract CAN channel.
Definition: can.h:59
@ IO_CAN_BUS_FLAG_BRS
Bit Rate Switch support is enabled.
Definition: can.h:44
@ IO_CAN_BUS_FLAG_FDF
FD Format (formerly Extended Data Length) support is enabled.
Definition: can.h:42
@ IO_CAN_BUS_FLAG_ERR
Reception of error frames is enabled.
Definition: can.h:39
#define LELY_IO_TX_TIMEOUT
The default timeout (in milliseconds) for I/O write operations.
Definition: io2.h:36
This header file is part of the I/O library; it contains the user-defined CAN channel declarations.
int io_user_can_chan_write_t(const struct can_msg *msg, int timeout, void *arg)
The type of function invoked by a user-defined CAN channel when a CAN frame needs to be written.
Definition: can.h:50
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
struct io_can_chan_write * io_can_chan_write_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel write operation from a pointer to its completion task.
Definition: can.c:99
struct io_can_chan_read * io_can_chan_read_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel read operation from a pointer to its completion task.
Definition: can.c:93
const struct io_dev_vtbl *const io_dev_t
An abstract I/O device.
Definition: dev.h:35
#define LELY_IO_USER_CAN_RXLEN
The default receive queue length (in number of CAN frames) of the user-defined CAN channel.
Definition: can.c:47
void io_user_can_chan_destroy(io_can_chan_t *chan)
Destroys a user-defined CAN channel.
Definition: can.c:407
int io_user_can_chan_on_msg(io_can_chan_t *chan, const struct can_msg *msg, const struct timespec *tp, int timeout)
Processes an incoming CAN frame.
Definition: can.c:416
int io_user_can_chan_on_err(io_can_chan_t *chan, const struct can_err *err, const struct timespec *tp, int timeout)
Processes an incoming CAN error frame.
Definition: can.c:441
io_can_chan_t * io_user_can_chan_create(io_ctx_t *ctx, ev_exec_t *exec, int flags, size_t rxlen, int txtimeo, io_user_can_chan_write_t *func, void *arg)
Creates a new user-defined CAN channel.
Definition: can.c:378
int timespec_get(struct timespec *ts, int base)
Sets the interval at ts to hold the current calendar time based on the specified time base.
Definition: time.c:32
#define TIME_UTC
An integer constant greater than 0 that designates the UTC time base.
Definition: time.h:207
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:194
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:257
void sllist_push_front(struct sllist *list, struct slnode *node)
Pushes a node to the front of a singly-linked list.
Definition: sllist.h:221
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:232
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:202
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:243
This header file is part of the utilities library; it contains the single-producer,...
size_t spscring_c_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:262
int spscring_c_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:327
size_t spscring_p_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a consumer and, if this satisfies a wait operation...
Definition: spscring.c:138
int spscring_p_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:163
size_t spscring_c_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a producer and, if this satisfies a wait operation...
Definition: spscring.c:302
int spscring_c_abort_wait(struct spscring *ring)
Aborts a wait operation previously registered with spscring_c_submit_wait().
Definition: spscring.c:368
void spscring_init(struct spscring *ring, size_t size)
Initializes a single-producer, single-consumer ring buffer with the specified size.
Definition: spscring.c:47
size_t spscring_p_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:98
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
A CAN error frame.
Definition: err.h:28
A CAN or CAN FD format frame.
Definition: msg.h:87
uint_least8_t flags
The flags (any combination of CAN_FLAG_IDE, CAN_FLAG_RTR, CAN_FLAG_FDF, CAN_FLAG_BRS and CAN_FLAG_ESI...
Definition: msg.h:94
An executable task.
Definition: task.h:41
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
A CAN channel read operation.
Definition: can.h:74
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition: can.h:98
struct timespec * tp
The address at which to store the system time at which the CAN frame or CAN error frame was received.
Definition: can.h:93
struct can_msg * msg
The address at which to store the CAN frame.
Definition: can.h:80
struct can_err * err
The address at which to store the CAN error frame.
Definition: can.h:86
A CAN channel write operation.
Definition: can.h:110
const struct can_msg * msg
A pointer to the CAN frame to be written.
Definition: can.h:116
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the write operation.
Definition: can.h:121
int errc
The error number, obtained as if by get_errc(), if an error occurred or the operation was canceled.
Definition: can.h:126
Definition: ctx.c:38
The virtual table of an I/O service.
Definition: ctx.h:67
An I/O service.
Definition: ctx.h:49
The implementation of a user-defined CAN channel.
Definition: can.c:105
struct ev_task write_task
The task responsible for initiating write operations.
Definition: can.c:135
void * arg
The user-specific value to be passed as the second argument to func.
Definition: can.c:131
ev_exec_t * exec
A pointer to the executor used to execute all I/O tasks.
Definition: can.c:115
unsigned write_posted
A flag indicating whether write_task has been posted to exec.
Definition: can.c:162
const struct io_can_chan_vtbl * chan_vptr
A pointer to the virtual table for the CAN channel interface.
Definition: can.c:109
int txtimeo
The timeout (in milliseconds) when writing a CAN frame asynchronously.
Definition: can.c:122
cnd_t p_cond
The condition variable used to wake up the receive queue producer.
Definition: can.c:140
struct io_svc svc
The I/O service representing the channel.
Definition: can.c:111
struct spscring rxring
The ring buffer used to control the receive queue.
Definition: can.c:147
const struct io_dev_vtbl * dev_vptr
A pointer to the virtual table for the I/O device interface.
Definition: can.c:107
struct ev_task * current_write
The write operation currently being executed.
Definition: can.c:170
io_user_can_chan_write_t * func
A pointer to the function to be invoked when a CAN frame needs to be written.
Definition: can.c:127
cnd_t c_cond
The condition variable used to wake up the receive queue consumer.
Definition: can.c:144
mtx_t p_mtx
The mutex protecting the receive queue producer.
Definition: can.c:138
io_ctx_t * ctx
A pointer to the I/O context with which the channel is registered.
Definition: can.c:113
struct ev_task * current_read
The read operation currently being executed.
Definition: can.c:168
unsigned shutdown
A flag indicating whether the I/O service has been shut down.
Definition: can.c:158
struct ev_task read_task
The task responsible for initiating read operations.
Definition: can.c:133
unsigned read_posted
A flag indicating whether read_task has been posted to exec.
Definition: can.c:160
mtx_t mtx
The mutex protecting the channel and the queues of pending operations.
Definition: can.c:155
struct sllist read_queue
The queue containing pending read operations.
Definition: can.c:164
mtx_t c_mtx
The mutex protecting the receive queue consumer.
Definition: can.c:142
struct sllist write_queue
The queue containing pending write operations.
Definition: can.c:166
int flags
The flags specifying which CAN bus features are enabled.
Definition: can.c:117
struct io_user_can_frame * rxbuf
The receive queue.
Definition: can.c:149
A singly-linked list.
Definition: sllist.h:52
A single-producer, single-consumer ring buffer.
Definition: spscring.h:63
size_t ev_task_queue_abort(struct sllist *queue)
Aborts the tasks in queue by invoking ev_exec_on_task_fini() for each of them.
Definition: task.c:55
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int cnd_init(cnd_t *cond)
Creates a condition variable.
int cnd_timedwait(cnd_t *cond, mtx_t *mtx, const struct timespec *ts)
Atomically unlocks the mutex at mtx and endeavors to block until the condition variable at cond is si...
int cnd_broadcast(cnd_t *cond)
Unblocks all of the threads that are blocked on the condition variable at cond at the time of the cal...
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
pthread_cond_t cnd_t
A complete object type that holds an identifier for a condition variable.
Definition: threads.h:78
void cnd_destroy(cnd_t *cond)
Releases all resources used by the condition variable at cond.
int cnd_wait(cnd_t *cond, mtx_t *mtx)
Atomically unlocks the mutex at mtx and endeavors to block until the condition variable at cond is si...
void thrd_yield(void)
Endeavors to permit other threads to run, even if the current thread would ordinarily continue to run...
@ thrd_timedout
Indicates that the time specified in the call was reached without acquiring the requested resource.
Definition: threads.h:128
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109
This header file is part of the utilities library; it contains the time function declarations.
void timespec_add_msec(struct timespec *tp, uint_least64_t msec)
Adds msec milliseconds to the time at tp.
Definition: time.h:141