Lely core libraries  2.2.5
can.c
Go to the documentation of this file.
1 
24 #include "../can.h"
25 #if !LELY_NO_THREADS
26 #include <lely/libc/threads.h>
27 #endif
28 #include <lely/io2/ctx.h>
29 #include <lely/io2/user/can.h>
30 #include <lely/util/errnum.h>
31 #include <lely/util/spscring.h>
32 #include <lely/util/time.h>
33 #include <lely/util/diag.h>
34 #include <lely/util/util.h>
35 
36 #include <assert.h>
37 #include <stdlib.h>
38 
39 #ifndef LELY_IO_USER_CAN_RXLEN
44 #define LELY_IO_USER_CAN_RXLEN 1024
45 #endif
46 
48  int is_err;
49  union {
50  struct can_msg msg;
51  struct can_err err;
52  } u;
53  struct timespec ts;
54 };
55 
56 static io_ctx_t *io_user_can_chan_dev_get_ctx(const io_dev_t *dev);
57 static ev_exec_t *io_user_can_chan_dev_get_exec(const io_dev_t *dev);
58 static size_t io_user_can_chan_dev_cancel(io_dev_t *dev, struct ev_task *task);
59 static size_t io_user_can_chan_dev_abort(io_dev_t *dev, struct ev_task *task);
60 
61 // clang-format off
62 static const struct io_dev_vtbl io_user_can_chan_dev_vtbl = {
63  &io_user_can_chan_dev_get_ctx,
64  &io_user_can_chan_dev_get_exec,
65  &io_user_can_chan_dev_cancel,
66  &io_user_can_chan_dev_abort
67 };
68 // clang-format on
69 
70 static io_dev_t *io_user_can_chan_get_dev(const io_can_chan_t *chan);
71 static int io_user_can_chan_get_flags(const io_can_chan_t *chan);
72 static int io_user_can_chan_read(io_can_chan_t *chan, struct can_msg *msg,
73  struct can_err *err, struct timespec *tp, int timeout);
74 static void io_user_can_chan_submit_read(
75  io_can_chan_t *chan, struct io_can_chan_read *read);
76 static int io_user_can_chan_write(
77  io_can_chan_t *chan, const struct can_msg *msg, int timeout);
78 static void io_user_can_chan_submit_write(
79  io_can_chan_t *chan, struct io_can_chan_write *write);
80 
81 // clang-format off
82 static const struct io_can_chan_vtbl io_user_can_chan_vtbl = {
83  &io_user_can_chan_get_dev,
84  &io_user_can_chan_get_flags,
85  &io_user_can_chan_read,
86  &io_user_can_chan_submit_read,
87  &io_user_can_chan_write,
88  &io_user_can_chan_submit_write
89 };
90 // clang-format on
91 
92 static void io_user_can_chan_svc_shutdown(struct io_svc *svc);
93 
94 // clang-format off
95 static const struct io_svc_vtbl io_user_can_chan_svc_vtbl = {
96  NULL,
97  &io_user_can_chan_svc_shutdown
98 };
99 // clang-format on
100 
104  const struct io_dev_vtbl *dev_vptr;
108  struct io_svc svc;
114  int flags;
119  int txtimeo;
128  void *arg;
130  struct ev_task read_task;
132  struct ev_task write_task;
133 #if !LELY_NO_THREADS
142 #endif
144  struct spscring rxring;
147 #if !LELY_NO_THREADS
153 #endif
155  unsigned shutdown : 1;
157  unsigned read_posted : 1;
159  unsigned write_posted : 1;
161  struct sllist read_queue;
163  struct sllist write_queue;
168 };
169 
170 static void io_user_can_chan_read_task_func(struct ev_task *task);
171 static void io_user_can_chan_write_task_func(struct ev_task *task);
172 
173 static inline struct io_user_can_chan *io_user_can_chan_from_dev(
174  const io_dev_t *dev);
175 static inline struct io_user_can_chan *io_user_can_chan_from_chan(
176  const io_can_chan_t *chan);
177 static inline struct io_user_can_chan *io_user_can_chan_from_svc(
178  const struct io_svc *svc);
179 
180 static int io_user_can_chan_on_frame(struct io_user_can_chan *user,
181  const struct io_user_can_frame *frame, int timeout);
182 
183 static void io_user_can_chan_p_signal(struct spscring *ring, void *arg);
184 static void io_user_can_chan_c_signal(struct spscring *ring, void *arg);
185 
186 static void io_user_can_chan_do_pop(struct io_user_can_chan *user,
187  struct sllist *read_queue, struct sllist *write_queue,
188  struct ev_task *task);
189 
190 static size_t io_user_can_chan_do_abort_tasks(struct io_user_can_chan *user);
191 
192 void *
193 io_user_can_chan_alloc(void)
194 {
195  struct io_user_can_chan *user = malloc(sizeof(*user));
196  if (!user)
197  set_errc(errno2c(errno));
198  // cppcheck-suppress memleak symbolName=user
199  return user ? &user->chan_vptr : NULL;
200 }
201 
202 void
203 io_user_can_chan_free(void *ptr)
204 {
205  if (ptr)
206  free(io_user_can_chan_from_chan(ptr));
207 }
208 
210 io_user_can_chan_init(io_can_chan_t *chan, io_ctx_t *ctx, ev_exec_t *exec,
211  int flags, size_t rxlen, int txtimeo,
213 {
214  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
215  assert(ctx);
216  assert(exec);
217 
218  if (flags & ~IO_CAN_BUS_FLAG_MASK) {
220  return NULL;
221  }
222 
223  if (!rxlen)
224  rxlen = LELY_IO_USER_CAN_RXLEN;
225 
226  if (!txtimeo)
228 
229  int errc = 0;
230 
231  user->dev_vptr = &io_user_can_chan_dev_vtbl;
232  user->chan_vptr = &io_user_can_chan_vtbl;
233 
234  user->svc = (struct io_svc)IO_SVC_INIT(&io_user_can_chan_svc_vtbl);
235  user->ctx = ctx;
236 
237  user->exec = exec;
238 
239  user->flags = flags;
240  user->txtimeo = txtimeo;
241 
242  user->func = func;
243  user->arg = arg;
244 
245  user->read_task = (struct ev_task)EV_TASK_INIT(
246  user->exec, &io_user_can_chan_read_task_func);
247  user->write_task = (struct ev_task)EV_TASK_INIT(
248  user->exec, &io_user_can_chan_write_task_func);
249 
250 #if !LELY_NO_THREADS
251  if (mtx_init(&user->p_mtx, mtx_plain) != thrd_success) {
252  errc = get_errc();
253  goto error_init_p_mtx;
254  }
255 
256  if (cnd_init(&user->p_cond) != thrd_success) {
257  errc = get_errc();
258  goto error_init_p_cond;
259  }
260 
261  if (mtx_init(&user->c_mtx, mtx_plain) != thrd_success) {
262  errc = get_errc();
263  goto error_init_c_mtx;
264  }
265 
266  if (cnd_init(&user->c_cond) != thrd_success) {
267  errc = get_errc();
268  goto error_init_c_cond;
269  }
270 #endif
271 
272  spscring_init(&user->rxring, rxlen);
273  user->rxbuf = calloc(rxlen, sizeof(struct io_user_can_frame));
274  if (!user->rxbuf) {
275  errc = errno2c(errno);
276  goto error_alloc_rxbuf;
277  }
278 
279 #if !LELY_NO_THREADS
280  if (mtx_init(&user->mtx, mtx_plain) != thrd_success) {
281  errc = get_errc();
282  goto error_init_mtx;
283  }
284 #endif
285 
286  user->shutdown = 0;
287  user->read_posted = 0;
288  user->write_posted = 0;
289 
290  sllist_init(&user->read_queue);
291  sllist_init(&user->write_queue);
292  user->current_read = NULL;
293  user->current_write = NULL;
294 
295  io_ctx_insert(user->ctx, &user->svc);
296 
297  return chan;
298 
299 #if !LELY_NO_THREADS
300  // mtx_destroy(&user->mtx);
301 error_init_mtx:
302 #endif
303  free(user->rxbuf);
304 error_alloc_rxbuf:
305 #if !LELY_NO_THREADS
306  cnd_destroy(&user->c_cond);
307 error_init_c_cond:
308  mtx_destroy(&user->c_mtx);
309 error_init_c_mtx:
310  cnd_destroy(&user->p_cond);
311 error_init_p_cond:
312  mtx_destroy(&user->p_mtx);
313 error_init_p_mtx:
314 #endif
315  set_errc(errc);
316  return NULL;
317 }
318 
319 void
320 io_user_can_chan_fini(io_can_chan_t *chan)
321 {
322  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
323 
324  io_ctx_remove(user->ctx, &user->svc);
325  // Cancel all pending tasks.
326  io_user_can_chan_svc_shutdown(&user->svc);
327 
328  // Abort any consumer wait operation running in a task. Producer wait
329  // operations are only initiated by io_user_can_chan_on_msg() and
330  // io_user_can_chan_on_err(), and should have terminated before this
331  // function is called.
333 
334 #if !LELY_NO_THREADS
335  int warning = 0;
336  mtx_lock(&user->mtx);
337  // If necessary, busy-wait until io_user_can_chan_read_task_func() and
338  // io_user_can_chan_write_task_func() complete.
339  while (user->read_posted || user->write_posted) {
340  if (io_user_can_chan_do_abort_tasks(user))
341  continue;
342  mtx_unlock(&user->mtx);
343  if (!warning) {
344  warning = 1;
345  diag(DIAG_WARNING, 0,
346  "io_user_can_chan_fini() invoked with pending operations");
347  }
348  thrd_yield();
349  mtx_lock(&user->mtx);
350  }
351  mtx_unlock(&user->mtx);
352 
353  mtx_destroy(&user->mtx);
354 #endif
355 
356  free(user->rxbuf);
357 
358 #if !LELY_NO_THREADS
359  cnd_destroy(&user->c_cond);
360  mtx_destroy(&user->c_mtx);
361  cnd_destroy(&user->p_cond);
362  mtx_destroy(&user->p_mtx);
363 #endif
364 }
365 
369 {
370  int errc = 0;
371 
372  io_can_chan_t *chan = io_user_can_chan_alloc();
373  if (!chan) {
374  errc = get_errc();
375  goto error_alloc;
376  }
377 
378  io_can_chan_t *tmp = io_user_can_chan_init(
379  chan, ctx, exec, flags, rxlen, txtimeo, func, arg);
380  if (!tmp) {
381  errc = get_errc();
382  goto error_init;
383  }
384  chan = tmp;
385 
386  return chan;
387 
388 error_init:
389  io_user_can_chan_free((void *)chan);
390 error_alloc:
391  set_errc(errc);
392  return NULL;
393 }
394 
395 void
397 {
398  if (chan) {
399  io_user_can_chan_fini(chan);
400  io_user_can_chan_free((void *)chan);
401  }
402 }
403 
404 int
406  const struct timespec *tp, int timeout)
407 {
408  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
409  assert(msg);
410 
411 #if !LELY_NO_CANFD
412  int flags = 0;
413  if (msg->flags & CAN_FLAG_FDF)
415  if (msg->flags & CAN_FLAG_BRS)
417  if ((flags & user->flags) != flags) {
419  return -1;
420  }
421 #endif
422 
423  struct io_user_can_frame frame = { .is_err = 0,
424  .u.msg = *msg,
425  .ts = tp ? *tp : (struct timespec){ 0, 0 } };
426  return io_user_can_chan_on_frame(user, &frame, timeout);
427 }
428 
429 int
431  const struct timespec *tp, int timeout)
432 {
433  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
434  assert(err);
435 
436  if (!(user->flags & IO_CAN_BUS_FLAG_ERR)) {
438  return -1;
439  }
440 
441  struct io_user_can_frame frame = { .is_err = 1,
442  .u.err = *err,
443  .ts = tp ? *tp : (struct timespec){ 0, 0 } };
444  return io_user_can_chan_on_frame(user, &frame, timeout);
445 }
446 
447 static io_ctx_t *
448 io_user_can_chan_dev_get_ctx(const io_dev_t *dev)
449 {
450  const struct io_user_can_chan *user = io_user_can_chan_from_dev(dev);
451 
452  return user->ctx;
453 }
454 
455 static ev_exec_t *
456 io_user_can_chan_dev_get_exec(const io_dev_t *dev)
457 {
458  const struct io_user_can_chan *user = io_user_can_chan_from_dev(dev);
459 
460  return user->exec;
461 }
462 
463 static size_t
464 io_user_can_chan_dev_cancel(io_dev_t *dev, struct ev_task *task)
465 {
466  struct io_user_can_chan *user = io_user_can_chan_from_dev(dev);
467 
468  size_t n = 0;
469 
470  struct sllist read_queue, write_queue;
471  sllist_init(&read_queue);
472  sllist_init(&write_queue);
473 
474 #if !LELY_NO_THREADS
475  mtx_lock(&user->mtx);
476 #endif
477  if (user->current_read && (!task || task == user->current_read)) {
478  user->current_read = NULL;
479  n++;
480  }
481  if (user->current_write && (!task || task == user->current_write)) {
482  user->current_write = NULL;
483  n++;
484  }
485  io_user_can_chan_do_pop(user, &read_queue, &write_queue, task);
486 #if !LELY_NO_THREADS
487  mtx_unlock(&user->mtx);
488 #endif
489 
490  size_t nread = io_can_chan_read_queue_post(
491  &read_queue, -1, errnum2c(ERRNUM_CANCELED));
492  n = n < SIZE_MAX - nread ? n + nread : SIZE_MAX;
493  size_t nwrite = io_can_chan_write_queue_post(
494  &write_queue, errnum2c(ERRNUM_CANCELED));
495  n = n < SIZE_MAX - nwrite ? n + nwrite : SIZE_MAX;
496 
497  return n;
498 }
499 
500 static size_t
501 io_user_can_chan_dev_abort(io_dev_t *dev, struct ev_task *task)
502 {
503  struct io_user_can_chan *user = io_user_can_chan_from_dev(dev);
504 
505  struct sllist queue;
506  sllist_init(&queue);
507 
508 #if !LELY_NO_THREADS
509  mtx_lock(&user->mtx);
510 #endif
511  io_user_can_chan_do_pop(user, &queue, &queue, task);
512 #if !LELY_NO_THREADS
513  mtx_unlock(&user->mtx);
514 #endif
515 
516  return ev_task_queue_abort(&queue);
517 }
518 
519 static io_dev_t *
520 io_user_can_chan_get_dev(const io_can_chan_t *chan)
521 {
522  const struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
523 
524  return &user->dev_vptr;
525 }
526 
527 static int
528 io_user_can_chan_get_flags(const io_can_chan_t *chan)
529 {
530  const struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
531 
532  return user->flags;
533 }
534 
535 static int
536 io_user_can_chan_read(io_can_chan_t *chan, struct can_msg *msg,
537  struct can_err *err, struct timespec *tp, int timeout)
538 {
539  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
540 
541 #if !LELY_NO_THREADS
542  // Compute the absolute timeout for cnd_timedwait().
543  struct timespec ts = { 0, 0 };
544  if (timeout > 0) {
545 #if LELY_NO_TIMEOUT
546  timeout = 0;
547  (void)ts;
548 #else
549  if (!timespec_get(&ts, TIME_UTC))
550  return -1;
551  timespec_add_msec(&ts, timeout);
552 #endif
553  }
554 #endif
555 
556 #if !LELY_NO_THREADS
557  mtx_lock(&user->c_mtx);
558 #endif
559  size_t i = 0;
560  for (;;) {
561  // Check if a frame is available in the receive queue.
562  size_t n = 1;
563  i = spscring_c_alloc(&user->rxring, &n);
564  if (n)
565  break;
566  if (!timeout) {
567 #if !LELY_NO_THREADS
568  mtx_unlock(&user->c_mtx);
569 #endif
571  return -1;
572  }
573  // Submit a wait operation for a single frame.
574  // clang-format off
575  if (!spscring_c_submit_wait(&user->rxring, 1,
576  &io_user_can_chan_c_signal, user))
577  // clang-format on
578  // If the wait condition was already satisfied, try
579  // again.
580  continue;
581  // Wait for the buffer to signal that a frame is
582  // available, or time out if that takes too long.
583 #if !LELY_NO_THREADS
584  int result;
585 #if !LELY_NO_TIMEOUT
586  if (timeout > 0)
587  result = cnd_timedwait(
588  &user->c_cond, &user->c_mtx, &ts);
589  else
590 #endif
591  result = cnd_wait(&user->c_cond, &user->c_mtx);
592  if (result != thrd_success) {
593  mtx_unlock(&user->c_mtx);
594 #if !LELY_NO_TIMEOUT
595  if (result == thrd_timedout)
597 #endif
598  return -1;
599  }
600 #endif
601  }
602  // Copy the frame from the buffer.
603  struct io_user_can_frame *frame = &user->rxbuf[i];
604  int is_err = frame->is_err;
605  if (!is_err && msg)
606  *msg = frame->u.msg;
607  else if (is_err && err)
608  *err = frame->u.err;
609  if (tp)
610  *tp = frame->ts;
611  spscring_c_commit(&user->rxring, 1);
612 #if !LELY_NO_THREADS
613  mtx_unlock(&user->c_mtx);
614 #endif
615 
616  return !is_err;
617 }
618 
619 static void
620 io_user_can_chan_submit_read(io_can_chan_t *chan, struct io_can_chan_read *read)
621 {
622  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
623  assert(read);
624  struct ev_task *task = &read->task;
625 
626  if (!task->exec)
627  task->exec = user->exec;
628  ev_exec_on_task_init(task->exec);
629 
630 #if !LELY_NO_THREADS
631  mtx_lock(&user->mtx);
632 #endif
633  if (user->shutdown) {
634 #if !LELY_NO_THREADS
635  mtx_unlock(&user->mtx);
636 #endif
637  io_can_chan_read_post(read, -1, errnum2c(ERRNUM_CANCELED));
638  } else {
639  int post_read = !user->read_posted
640  && sllist_empty(&user->read_queue);
641  sllist_push_back(&user->read_queue, &task->_node);
642  if (post_read)
643  user->read_posted = 1;
644 #if !LELY_NO_THREADS
645  mtx_unlock(&user->mtx);
646 #endif
647  // cppcheck-suppress duplicateCondition
648  if (post_read)
649  ev_exec_post(user->read_task.exec, &user->read_task);
650  }
651 }
652 
653 static int
654 io_user_can_chan_write(
655  io_can_chan_t *chan, const struct can_msg *msg, int timeout)
656 {
657  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
658  assert(msg);
659 
660 #if !LELY_NO_CANFD
661  int flags = 0;
662  if (msg->flags & CAN_FLAG_FDF)
664  if (msg->flags & CAN_FLAG_BRS)
666  if ((flags & user->flags) != flags) {
668  return -1;
669  }
670 #endif
671 
672  if (!user->func) {
674  return -1;
675  }
676 
677  return user->func(msg, timeout, user->arg);
678 }
679 
680 static void
681 io_user_can_chan_submit_write(
682  io_can_chan_t *chan, struct io_can_chan_write *write)
683 {
684  struct io_user_can_chan *user = io_user_can_chan_from_chan(chan);
685  assert(write);
686  struct ev_task *task = &write->task;
687 
688 #if !LELY_NO_CANFD
689  int flags = 0;
690  if (write->msg->flags & CAN_FLAG_FDF)
691  flags |= IO_CAN_BUS_FLAG_FDF;
692  if (write->msg->flags & CAN_FLAG_BRS)
693  flags |= IO_CAN_BUS_FLAG_BRS;
694 #endif
695 
696  if (!task->exec)
697  task->exec = user->exec;
698  ev_exec_on_task_init(task->exec);
699 
700 #if !LELY_NO_THREADS
701  mtx_lock(&user->mtx);
702 #endif
703  if (user->shutdown) {
704 #if !LELY_NO_THREADS
705  mtx_unlock(&user->mtx);
706 #endif
707  io_can_chan_write_post(write, errnum2c(ERRNUM_CANCELED));
708 #if !LELY_NO_CANFD
709  } else if ((flags & user->flags) != flags) {
710 #if !LELY_NO_THREADS
711  mtx_unlock(&user->mtx);
712 #endif
713  io_can_chan_write_post(write, errnum2c(ERRNUM_INVAL));
714 #endif
715  } else if (!user->func) {
716 #if !LELY_NO_THREADS
717  mtx_unlock(&user->mtx);
718 #endif
719  io_can_chan_write_post(write, errnum2c(ERRNUM_NOSYS));
720  } else {
721  int post_write = !user->write_posted
722  && sllist_empty(&user->write_queue);
723  sllist_push_back(&user->write_queue, &task->_node);
724  if (post_write)
725  user->write_posted = 1;
726 #if !LELY_NO_THREADS
727  mtx_unlock(&user->mtx);
728 #endif
729  // cppcheck-suppress duplicateCondition
730  if (post_write)
731  ev_exec_post(user->write_task.exec, &user->write_task);
732  }
733 }
734 
735 static void
736 io_user_can_chan_svc_shutdown(struct io_svc *svc)
737 {
738  struct io_user_can_chan *user = io_user_can_chan_from_svc(svc);
739  io_dev_t *dev = &user->dev_vptr;
740 
741 #if !LELY_NO_THREADS
742  mtx_lock(&user->mtx);
743 #endif
744  int shutdown = !user->shutdown;
745  user->shutdown = 1;
746  if (shutdown)
747  // Try to abort io_user_can_chan_read_task_func() and
748  // io_user_can_chan_write_task_func().
749  io_user_can_chan_do_abort_tasks(user);
750 #if !LELY_NO_THREADS
751  mtx_unlock(&user->mtx);
752 #endif
753  // cppcheck-suppress duplicateCondition
754  if (shutdown)
755  // Cancel all pending operations.
756  io_user_can_chan_dev_cancel(dev, NULL);
757 }
758 
759 static void
760 io_user_can_chan_read_task_func(struct ev_task *task)
761 {
762  assert(task);
763  struct io_user_can_chan *user =
764  structof(task, struct io_user_can_chan, read_task);
765  io_can_chan_t *chan = &user->chan_vptr;
766 
767  int errsv = get_errc();
768 
769  int wouldblock = 0;
770 
771 #if !LELY_NO_THREADS
772  mtx_lock(&user->mtx);
773 #endif
774  // Try to process all pending read operations at once.
775  while ((task = user->current_read = ev_task_from_node(
776  sllist_pop_front(&user->read_queue)))) {
777 #if !LELY_NO_THREADS
778  mtx_unlock(&user->mtx);
779 #endif
780  struct io_can_chan_read *read =
782  int result = io_user_can_chan_read(
783  chan, read->msg, read->err, read->tp, 0);
784  int errc = result >= 0 ? 0 : get_errc();
785  wouldblock = errc == errnum2c(ERRNUM_AGAIN)
786  || errc == errnum2c(ERRNUM_WOULDBLOCK);
787  if (!wouldblock)
788  // The operation succeeded or failed immediately.
789  io_can_chan_read_post(read, result, errc);
790 #if !LELY_NO_THREADS
791  mtx_lock(&user->mtx);
792 #endif
793  if (task == user->current_read) {
794  // Put the read operation back on the queue if it would
795  // block, unless it was canceled.
796  if (wouldblock) {
798  &task->_node);
799  task = NULL;
800  }
801  user->current_read = NULL;
802  }
803  assert(!user->current_read);
804  // Stop if the operation did or would block.
805  if (wouldblock)
806  break;
807  }
808  // Repost this task if any read operations remain in the queue.
809  int post_read = !sllist_empty(&user->read_queue) && !user->shutdown;
810  // Register a wait operation if the receive queue is empty.
811  if (post_read && wouldblock) {
812 #if !LELY_NO_THREADS
813  mtx_lock(&user->c_mtx);
814 #endif
815  // Do not repost this task unless the wait condition can be
816  // satisfied immediately.
817  post_read = !spscring_c_submit_wait(&user->rxring, 1,
818  io_user_can_chan_c_signal, user);
819 #if !LELY_NO_THREADS
820  mtx_unlock(&user->c_mtx);
821 #endif
822  }
823  user->read_posted = post_read;
824 #if !LELY_NO_THREADS
825  mtx_unlock(&user->mtx);
826 #endif
827 
828  if (task && wouldblock)
829  // The operation would block but was canceled before it could be
830  // requeued.
831  io_can_chan_read_post(io_can_chan_read_from_task(task), -1,
833 
834  if (post_read)
835  ev_exec_post(user->read_task.exec, &user->read_task);
836 
837  set_errc(errsv);
838 }
839 
840 static void
841 io_user_can_chan_write_task_func(struct ev_task *task)
842 {
843  assert(task);
844  struct io_user_can_chan *user =
845  structof(task, struct io_user_can_chan, write_task);
846  io_can_chan_t *chan = &user->chan_vptr;
847 
848  int errsv = get_errc();
849 
850  int wouldblock = 0;
851 
852 #if !LELY_NO_THREADS
853  mtx_lock(&user->mtx);
854 #endif
855  // clang-format off
856  if ((task = user->current_write = ev_task_from_node(
857  sllist_pop_front(&user->write_queue)))) {
858  // clang-format on
859 #if !LELY_NO_THREADS
860  mtx_unlock(&user->mtx);
861 #endif
862  struct io_can_chan_write *write =
864  int result = io_user_can_chan_write(
865  chan, write->msg, user->txtimeo);
866  int errc = !result ? 0 : get_errc();
867  wouldblock = errc == errnum2c(ERRNUM_AGAIN)
869  if (!wouldblock)
870  // The operation succeeded or failed immediately.
871  io_can_chan_write_post(write, errc);
872 #if !LELY_NO_THREADS
873  mtx_lock(&user->mtx);
874 #endif
875  if (task == user->current_write) {
876  // Put the read operation back on the queue if it would
877  // block, unless it was canceled.
878  if (wouldblock) {
880  &task->_node);
881  task = NULL;
882  }
883  user->current_write = NULL;
884  }
885  assert(!user->current_write);
886  }
887  int post_write = user->write_posted =
888  !sllist_empty(&user->write_queue) && !user->shutdown;
889 #if !LELY_NO_THREADS
890  mtx_unlock(&user->mtx);
891 #endif
892 
893  if (task && wouldblock)
894  // The operation would block but was canceled before it could be
895  // requeued.
896  io_can_chan_write_post(io_can_chan_write_from_task(task),
898 
899  if (post_write)
900  ev_exec_post(user->write_task.exec, &user->write_task);
901 
902  set_errc(errsv);
903 }
904 
905 static inline struct io_user_can_chan *
906 io_user_can_chan_from_dev(const io_dev_t *dev)
907 {
908  assert(dev);
909 
910  return structof(dev, struct io_user_can_chan, dev_vptr);
911 }
912 
913 static inline struct io_user_can_chan *
914 io_user_can_chan_from_chan(const io_can_chan_t *chan)
915 {
916  assert(chan);
917 
918  return structof(chan, struct io_user_can_chan, chan_vptr);
919 }
920 
921 static inline struct io_user_can_chan *
922 io_user_can_chan_from_svc(const struct io_svc *svc)
923 {
924  assert(svc);
925 
926  return structof(svc, struct io_user_can_chan, svc);
927 }
928 
929 static int
930 io_user_can_chan_on_frame(struct io_user_can_chan *user,
931  const struct io_user_can_frame *frame, int timeout)
932 {
933  assert(user);
934  assert(frame);
935 
936 #if !LELY_NO_THREADS
937  // Compute the absolute timeout for cnd_timedwait().
938  struct timespec ts = { 0, 0 };
939  if (timeout > 0) {
940 #if LELY_NO_TIMEOUT
941  timeout = 0;
942  (void)ts;
943 #else
944  if (!timespec_get(&ts, TIME_UTC))
945  return -1;
946  timespec_add_msec(&ts, timeout);
947 #endif
948  }
949 #endif
950 
951 #if !LELY_NO_THREADS
952  mtx_lock(&user->p_mtx);
953 #endif
954  size_t i = 0;
955  for (;;) {
956  // Check if a slot is available in the receive queue.
957  size_t n = 1;
958  i = spscring_p_alloc(&user->rxring, &n);
959  if (n)
960  break;
961  if (!timeout) {
962 #if !LELY_NO_THREADS
963  mtx_unlock(&user->p_mtx);
964 #endif
966  return -1;
967  }
968  // clang-format off
969  if (!spscring_p_submit_wait(&user->rxring, 1,
970  &io_user_can_chan_p_signal, user))
971  // clang-format on
972  // If the wait condition was already satisfied, try
973  // again.
974  continue;
975 #if !LELY_NO_THREADS
976  int result;
977 #if !LELY_NO_TIMEOUT
978  // Wait for the buffer to signal that a slot is available, or
979  // time out if that takes too long.
980  if (timeout > 0)
981  result = cnd_timedwait(
982  &user->p_cond, &user->p_mtx, &ts);
983  else
984 #endif
985  result = cnd_wait(&user->p_cond, &user->p_mtx);
986  if (result != thrd_success) {
987  mtx_unlock(&user->p_mtx);
988 #if !LELY_NO_TIMEOUT
989  if (result == thrd_timedout)
991 #endif
992  return -1;
993  }
994 #endif
995  }
996  // Copy the frame to the buffer.
997  user->rxbuf[i] = *frame;
998  spscring_p_commit(&user->rxring, 1);
999 #if !LELY_NO_THREADS
1000  mtx_unlock(&user->p_mtx);
1001 #endif
1002 
1003  return 0;
1004 }
1005 
1006 static void
1007 io_user_can_chan_p_signal(struct spscring *ring, void *arg)
1008 {
1009  (void)ring;
1010 
1011 #if LELY_NO_THREADS
1012  (void)arg;
1013 #else
1014  struct io_user_can_chan *user = arg;
1015  assert(user);
1016 
1017  mtx_lock(&user->p_mtx);
1018  cnd_broadcast(&user->p_cond);
1019  mtx_unlock(&user->p_mtx);
1020 #endif
1021 }
1022 
1023 static void
1024 io_user_can_chan_c_signal(struct spscring *ring, void *arg)
1025 {
1026  (void)ring;
1027  struct io_user_can_chan *user = arg;
1028  assert(user);
1029 
1030 #if !LELY_NO_THREADS
1031  mtx_lock(&user->c_mtx);
1032  cnd_broadcast(&user->c_cond);
1033  mtx_unlock(&user->c_mtx);
1034 #endif
1035 
1036 #if !LELY_NO_THREADS
1037  mtx_lock(&user->mtx);
1038 #endif
1039  int post_read = !user->read_posted && !sllist_empty(&user->read_queue)
1040  && !user->shutdown;
1041  if (post_read)
1042  user->read_posted = 1;
1043 #if !LELY_NO_THREADS
1044  mtx_unlock(&user->mtx);
1045 #endif
1046  // cppcheck-suppress duplicateCondition
1047  if (post_read)
1048  ev_exec_post(user->read_task.exec, &user->read_task);
1049 }
1050 
1051 static void
1052 io_user_can_chan_do_pop(struct io_user_can_chan *user,
1053  struct sllist *read_queue, struct sllist *write_queue,
1054  struct ev_task *task)
1055 {
1056  assert(user);
1057  assert(read_queue);
1058  assert(write_queue);
1059 
1060  if (!task) {
1063  } else if (sllist_remove(&user->read_queue, &task->_node)) {
1064  sllist_push_back(read_queue, &task->_node);
1065  } else if (sllist_remove(&user->write_queue, &task->_node)) {
1066  sllist_push_back(write_queue, &task->_node);
1067  }
1068 }
1069 
1070 static size_t
1071 io_user_can_chan_do_abort_tasks(struct io_user_can_chan *user)
1072 {
1073  size_t n = 0;
1074 
1075  // Try to abort io_user_can_chan_read_task_func().
1076  // clang-format off
1077  if (user->read_posted && ev_exec_abort(user->read_task.exec,
1078  &user->read_task)) {
1079  // clang-format on
1080  user->read_posted = 0;
1081  n++;
1082  }
1083 
1084  // Try to abort io_user_can_chan_write_task_func().
1085  // clang-format off
1086  if (user->write_posted && ev_exec_abort(user->write_task.exec,
1087  &user->write_task)) {
1088  // clang-format on
1089  user->write_posted = 0;
1090  n++;
1091  }
1092 
1093  return n;
1094 }
@ CAN_FLAG_FDF
The FD Format (FDF) flag, formerly known as Extended Data Length (EDL).
Definition: msg.h:54
@ CAN_FLAG_BRS
The Bit Rate Switch (BRS) flag (only available in CAN FD format frames).
Definition: msg.h:62
This header file is part of the I/O library; it contains the I/O context and service declarations.
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:121
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:136
This header file is part of the utilities library; it contains the diagnostic declarations.
@ DIAG_WARNING
A warning.
Definition: diag.h:47
void diag(enum diag_severity severity, int errc, const char *format,...)
Emits a diagnostic message.
Definition: diag.c:156
This header file is part of the utilities library; it contains the native and platform-independent er...
int errnum2c(errnum_t errnum)
Transforms a platform-independent error number to a native error code.
Definition: errnum.c:825
@ ERRNUM_NOSYS
Function not supported.
Definition: errnum.h:181
@ ERRNUM_WOULDBLOCK
Operation would block.
Definition: errnum.h:230
@ ERRNUM_INVAL
Invalid argument.
Definition: errnum.h:129
@ ERRNUM_AGAIN
Resource unavailable, try again.
Definition: errnum.h:86
@ ERRNUM_CANCELED
Operation canceled.
Definition: errnum.h:96
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition: errnum.c:947
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:957
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:43
void set_errnum(errnum_t errnum)
Sets the current (thread-specific) platform-independent error number to errnum.
Definition: errnum.h:375
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:138
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:126
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:108
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
const struct io_can_chan_vtbl *const io_can_chan_t
An abstract CAN channel.
Definition: can.h:59
@ IO_CAN_BUS_FLAG_BRS
Bit Rate Switch support is enabled.
Definition: can.h:44
@ IO_CAN_BUS_FLAG_FDF
FD Format (formerly Extended Data Length) support is enabled.
Definition: can.h:42
@ IO_CAN_BUS_FLAG_ERR
Reception of error frames is enabled.
Definition: can.h:39
#define LELY_IO_TX_TIMEOUT
The default timeout (in milliseconds) for I/O write operations.
Definition: io2.h:36
This header file is part of the I/O library; it contains the user-defined CAN channel declarations.
int io_user_can_chan_write_t(const struct can_msg *msg, int timeout, void *arg)
The type of function invoked by a user-defined CAN channel when a CAN frame needs to be written.
Definition: can.h:50
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
struct io_can_chan_write * io_can_chan_write_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel write operation from a pointer to its completion task.
Definition: can.c:97
struct io_can_chan_read * io_can_chan_read_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel read operation from a pointer to its completion task.
Definition: can.c:91
const struct io_dev_vtbl *const io_dev_t
An abstract I/O device.
Definition: dev.h:35
#define LELY_IO_USER_CAN_RXLEN
The default receive queue length (in number of CAN frames) of the user-defined CAN channel.
Definition: can.c:44
void io_user_can_chan_destroy(io_can_chan_t *chan)
Destroys a user-defined CAN channel.
Definition: can.c:396
int io_user_can_chan_on_msg(io_can_chan_t *chan, const struct can_msg *msg, const struct timespec *tp, int timeout)
Processes an incoming CAN frame.
Definition: can.c:405
int io_user_can_chan_on_err(io_can_chan_t *chan, const struct can_err *err, const struct timespec *tp, int timeout)
Processes an incoming CAN error frame.
Definition: can.c:430
io_can_chan_t * io_user_can_chan_create(io_ctx_t *ctx, ev_exec_t *exec, int flags, size_t rxlen, int txtimeo, io_user_can_chan_write_t *func, void *arg)
Creates a new user-defined CAN channel.
Definition: can.c:367
int timespec_get(struct timespec *ts, int base)
Sets the interval at ts to hold the current calendar time based on the specified time base.
Definition: time.c:32
#define TIME_UTC
An integer constant greater than 0 that designates the UTC time base.
Definition: time.h:202
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:184
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:233
void sllist_push_front(struct sllist *list, struct slnode *node)
Pushes a node to the front of a singly-linked list.
Definition: sllist.h:205
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:213
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:190
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:221
This header file is part of the utilities library; it contains the single-producer,...
size_t spscring_c_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:262
int spscring_c_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:327
size_t spscring_p_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a consumer and, if this satisfies a wait operation...
Definition: spscring.c:138
int spscring_p_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:163
size_t spscring_c_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a producer and, if this satisfies a wait operation...
Definition: spscring.c:302
int spscring_c_abort_wait(struct spscring *ring)
Aborts a wait operation previously registered with spscring_c_submit_wait().
Definition: spscring.c:368
void spscring_init(struct spscring *ring, size_t size)
Initializes a single-producer, single-consumer ring buffer with the specified size.
Definition: spscring.c:47
size_t spscring_p_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:98
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
A CAN error frame.
Definition: err.h:28
A CAN or CAN FD format frame.
Definition: msg.h:87
uint_least8_t flags
The flags (any combination of CAN_FLAG_IDE, CAN_FLAG_RTR, CAN_FLAG_FDF, CAN_FLAG_BRS and CAN_FLAG_ESI...
Definition: msg.h:94
An executable task.
Definition: task.h:41
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
A CAN channel read operation.
Definition: can.h:74
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition: can.h:98
struct timespec * tp
The address at which to store the system time at which the CAN frame or CAN error frame was received.
Definition: can.h:93
struct can_msg * msg
The address at which to store the CAN frame.
Definition: can.h:80
struct can_err * err
The address at which to store the CAN error frame.
Definition: can.h:86
A CAN channel write operation.
Definition: can.h:110
const struct can_msg * msg
A pointer to the CAN frame to be written.
Definition: can.h:116
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the write operation.
Definition: can.h:121
int errc
The error number, obtained as if by get_errc(), if an error occurred or the operation was canceled.
Definition: can.h:126
Definition: ctx.c:35
The virtual table of an I/O service.
Definition: ctx.h:67
An I/O service.
Definition: ctx.h:49
The implementation of a user-defined CAN channel.
Definition: can.c:102
struct ev_task write_task
The task responsible for initiating write operations.
Definition: can.c:132
void * arg
The user-specific value to be passed as the second argument to func.
Definition: can.c:128
ev_exec_t * exec
A pointer to the executor used to execute all I/O tasks.
Definition: can.c:112
unsigned write_posted
A flag indicating whether write_task has been posted to exec.
Definition: can.c:159
const struct io_can_chan_vtbl * chan_vptr
A pointer to the virtual table for the CAN channel interface.
Definition: can.c:106
int txtimeo
The timeout (in milliseconds) when writing a CAN frame asynchronously.
Definition: can.c:119
cnd_t p_cond
The condition variable used to wake up the receive queue producer.
Definition: can.c:137
struct io_svc svc
The I/O service representing the channel.
Definition: can.c:108
struct spscring rxring
The ring buffer used to control the receive queue.
Definition: can.c:144
const struct io_dev_vtbl * dev_vptr
A pointer to the virtual table for the I/O device interface.
Definition: can.c:104
struct ev_task * current_write
The write operation currently being executed.
Definition: can.c:167
io_user_can_chan_write_t * func
A pointer to the function to be invoked when a CAN frame needs to be written.
Definition: can.c:124
cnd_t c_cond
The condition variable used to wake up the receive queue consumer.
Definition: can.c:141
mtx_t p_mtx
The mutex protecting the receive queue producer.
Definition: can.c:135
io_ctx_t * ctx
A pointer to the I/O context with which the channel is registered.
Definition: can.c:110
struct ev_task * current_read
The read operation currently being executed.
Definition: can.c:165
unsigned shutdown
A flag indicating whether the I/O service has been shut down.
Definition: can.c:155
struct ev_task read_task
The task responsible for initiating read operations.
Definition: can.c:130
unsigned read_posted
A flag indicating whether read_task has been posted to exec.
Definition: can.c:157
mtx_t mtx
The mutex protecting the channel and the queues of pending operations.
Definition: can.c:152
struct sllist read_queue
The queue containing pending read operations.
Definition: can.c:161
mtx_t c_mtx
The mutex protecting the receive queue consumer.
Definition: can.c:139
struct sllist write_queue
The queue containing pending write operations.
Definition: can.c:163
int flags
The flags specifying which CAN bus features are enabled.
Definition: can.c:114
struct io_user_can_frame * rxbuf
The receive queue.
Definition: can.c:146
A singly-linked list.
Definition: sllist.h:51
A single-producer, single-consumer ring buffer.
Definition: spscring.h:61
size_t ev_task_queue_abort(struct sllist *queue)
Aborts the tasks in queue by invoking ev_exec_on_task_fini() for each of them.
Definition: task.c:55
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int cnd_init(cnd_t *cond)
Creates a condition variable.
int cnd_timedwait(cnd_t *cond, mtx_t *mtx, const struct timespec *ts)
Atomically unlocks the mutex at mtx and endeavors to block until the condition variable at cond is si...
int cnd_broadcast(cnd_t *cond)
Unblocks all of the threads that are blocked on the condition variable at cond at the time of the cal...
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
pthread_cond_t cnd_t
A complete object type that holds an identifier for a condition variable.
Definition: threads.h:78
void cnd_destroy(cnd_t *cond)
Releases all resources used by the condition variable at cond.
int cnd_wait(cnd_t *cond, mtx_t *mtx)
Atomically unlocks the mutex at mtx and endeavors to block until the condition variable at cond is si...
void thrd_yield(void)
Endeavors to permit other threads to run, even if the current thread would ordinarily continue to run...
@ thrd_timedout
Indicates that the time specified in the call was reached without acquiring the requested resource.
Definition: threads.h:128
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109
This header file is part of the utilities library; it contains the time function declarations.
void timespec_add_msec(struct timespec *tp, uint_least64_t msec)
Adds msec milliseconds to the time at tp.
Definition: time.h:135