Lely core libraries  2.2.5
can_rt.c
Go to the documentation of this file.
1 
24 #include "io2.h"
25 #if !LELY_NO_THREADS
26 #include <lely/libc/threads.h>
27 #endif
28 #include <lely/ev/exec.h>
29 #include <lely/ev/strand.h>
30 #include <lely/io2/can_rt.h>
31 #include <lely/io2/ctx.h>
32 #include <lely/util/errnum.h>
33 #include <lely/util/util.h>
34 
35 #include <assert.h>
36 #include <stdlib.h>
37 
38 static io_ctx_t *io_can_rt_dev_get_ctx(const io_dev_t *dev);
39 static ev_exec_t *io_can_rt_dev_get_exec(const io_dev_t *dev);
40 static size_t io_can_rt_dev_cancel(io_dev_t *dev, struct ev_task *task);
41 static size_t io_can_rt_dev_abort(io_dev_t *dev, struct ev_task *task);
42 
43 // clang-format off
44 static const struct io_dev_vtbl io_can_rt_dev_vtbl = {
45  &io_can_rt_dev_get_ctx,
46  &io_can_rt_dev_get_exec,
47  &io_can_rt_dev_cancel,
48  &io_can_rt_dev_abort
49 };
50 // clang-format on
51 
52 static void io_can_rt_svc_shutdown(struct io_svc *svc);
53 
54 // clang-format off
55 static const struct io_svc_vtbl io_can_rt_svc_vtbl = {
56  NULL,
57  &io_can_rt_svc_shutdown
58 };
59 // clang-format on
60 
61 struct io_can_rt {
62  const struct io_dev_vtbl *dev_vptr;
63  struct io_svc svc;
64  io_ctx_t *ctx;
65  io_can_chan_t *chan;
66  ev_exec_t *exec;
67  struct can_msg msg;
68  struct can_err err;
69  struct io_can_chan_read read;
70  struct ev_task task;
71 #if !LELY_NO_THREADS
72  mtx_t mtx;
73 #endif
74  unsigned shutdown : 1;
75  unsigned submitted : 1;
76  struct rbtree msg_queue;
77  struct sllist err_queue;
78 };
79 
80 static void io_can_rt_read_func(struct ev_task *task);
81 static void io_can_rt_task_func(struct ev_task *task);
82 
83 static inline io_can_rt_t *io_can_rt_from_dev(const io_dev_t *dev);
84 static inline io_can_rt_t *io_can_rt_from_svc(const struct io_svc *svc);
85 
86 static void io_can_rt_do_pop(io_can_rt_t *rt, struct sllist *msg_queue,
87  struct sllist *err_queue, struct ev_task *task);
88 static void io_can_rt_do_pop_read_msg(io_can_rt_t *rt, struct sllist *queue,
89  struct io_can_rt_read_msg *read_msg);
90 static void io_can_rt_do_pop_read_err(io_can_rt_t *rt, struct sllist *queue,
91  struct io_can_rt_read_err *read_err);
92 
93 static void io_can_rt_read_msg_post(struct io_can_rt_read_msg *read_msg,
94  const struct can_msg *msg, int errc);
95 static size_t io_can_rt_read_msg_queue_post(
96  struct sllist *queue, const struct can_msg *msg, int errc);
97 
98 static void io_can_rt_read_err_post(struct io_can_rt_read_err *read_err,
99  const struct can_err *err, int errc);
100 static size_t io_can_rt_read_err_queue_post(
101  struct sllist *queue, const struct can_err *err, int errc);
102 
103 static int io_can_rt_read_msg_cmp(const void *p1, const void *p2);
104 
106  ev_promise_t *promise;
107  struct io_can_rt_read_msg read_msg;
108  struct can_msg msg;
109 };
110 
111 static void io_can_rt_async_read_msg_func(struct ev_task *task);
112 
114  ev_promise_t *promise;
115  struct io_can_rt_read_err read_err;
116  struct can_err err;
117 };
118 
119 static void io_can_rt_async_read_err_func(struct ev_task *task);
120 
122  ev_promise_t *promise;
123  struct ev_task task;
124 };
125 
126 static void io_can_rt_async_shutdown_func(struct ev_task *task);
127 
128 void *
129 io_can_rt_alloc(void)
130 {
131  void *ptr = malloc(sizeof(io_can_rt_t));
132  if (!ptr)
133  set_errc(errno2c(errno));
134  return ptr;
135 }
136 
137 void
138 io_can_rt_free(void *ptr)
139 {
140  free(ptr);
141 }
142 
143 io_can_rt_t *
144 io_can_rt_init(io_can_rt_t *rt, io_can_chan_t *chan, ev_exec_t *exec)
145 {
146  assert(rt);
147  assert(chan);
148  io_dev_t *dev = io_can_chan_get_dev(chan);
149  assert(dev);
150  io_ctx_t *ctx = io_dev_get_ctx(dev);
151  assert(ctx);
152  if (!exec)
153  exec = io_dev_get_exec(dev);
154  assert(exec);
155 
156  int errsv = 0;
157 
158  rt->dev_vptr = &io_can_rt_dev_vtbl;
159 
160  rt->svc = (struct io_svc)IO_SVC_INIT(&io_can_rt_svc_vtbl);
161  rt->ctx = ctx;
162 
163  rt->chan = chan;
164 
165  rt->exec = ev_strand_create(exec);
166  if (!rt->exec) {
167  errsv = get_errc();
168  goto error_create_strand;
169  }
170 
171  rt->msg = (struct can_msg)CAN_MSG_INIT;
172  rt->err = (struct can_err)CAN_ERR_INIT;
173  rt->read = (struct io_can_chan_read)IO_CAN_CHAN_READ_INIT(&rt->msg,
174  &rt->err, NULL, rt->exec, &io_can_rt_read_func);
175 
176  rt->task = (struct ev_task)EV_TASK_INIT(rt->exec, &io_can_rt_task_func);
177 
178 #if !LELY_NO_THREADS
179  if (mtx_init(&rt->mtx, mtx_plain) != thrd_success) {
180  errsv = get_errc();
181  goto error_init_mtx;
182  }
183 #endif
184 
185  rt->shutdown = 0;
186  rt->submitted = 0;
187 
188  rbtree_init(&rt->msg_queue, &io_can_rt_read_msg_cmp);
189  sllist_init(&rt->err_queue);
190 
191  io_ctx_insert(rt->ctx, &rt->svc);
192 
193  return rt;
194 
195 #if !LELY_NO_THREADS
196  // mtx_destroy(&rt->mtx);
197 error_init_mtx:
198 #endif
199  ev_strand_destroy(rt->exec);
200 error_create_strand:
201  errno = errsv;
202  return NULL;
203 }
204 
205 void
206 io_can_rt_fini(io_can_rt_t *rt)
207 {
208  assert(rt);
209 
210  io_ctx_remove(rt->ctx, &rt->svc);
211 
212 #if !LELY_NO_THREADS
213  mtx_destroy(&rt->mtx);
214 #endif
215 
216  ev_strand_destroy(rt->exec);
217 }
218 
219 io_can_rt_t *
221 {
222  int errc = 0;
223 
224  io_can_rt_t *rt = io_can_rt_alloc();
225  if (!rt) {
226  errc = get_errc();
227  goto error_alloc;
228  }
229 
230  io_can_rt_t *tmp = io_can_rt_init(rt, chan, exec);
231  if (!tmp) {
232  errc = get_errc();
233  goto error_init;
234  }
235  rt = tmp;
236 
237  return rt;
238 
239 error_init:
240  io_can_rt_free(rt);
241 error_alloc:
242  set_errc(errc);
243  return NULL;
244 }
245 
246 void
248 {
249  if (rt) {
250  io_can_rt_fini(rt);
251  io_can_rt_free(rt);
252  }
253 }
254 
255 io_dev_t *
257 {
258  assert(rt);
259 
260  return &rt->dev_vptr;
261 }
262 
265 {
266  assert(rt);
267 
268  return rt->chan;
269 }
270 
271 void
273 {
274  assert(rt);
275  assert(read_msg);
276  struct ev_task *task = &read_msg->task;
277 
278  task->exec = rt->exec;
279  ev_exec_on_task_init(task->exec);
280 
281 #if !LELY_NO_THREADS
282  mtx_lock(&rt->mtx);
283 #endif
284  if (rt->shutdown) {
285 #if !LELY_NO_THREADS
286  mtx_unlock(&rt->mtx);
287 #endif
288  io_can_rt_read_msg_post(
289  read_msg, NULL, errnum2c(ERRNUM_CANCELED));
290  } else {
291  task->_data = &rt->msg_queue;
292  struct rbnode *node = rbtree_find(&rt->msg_queue, read_msg);
293  if (node) {
294  read_msg = structof(
295  node, struct io_can_rt_read_msg, _node);
296  sllist_push_back(&read_msg->_queue, &task->_node);
297  } else {
298  rbnode_init(&read_msg->_node, read_msg);
299  sllist_init(&read_msg->_queue);
300  rbtree_insert(&rt->msg_queue, &read_msg->_node);
301  }
302  int submit = !rt->submitted;
303  if (submit)
304  rt->submitted = 1;
305 #if !LELY_NO_THREADS
306  mtx_unlock(&rt->mtx);
307 #endif
308  if (submit)
309  io_can_chan_submit_read(rt->chan, &rt->read);
310  }
311 }
312 
313 size_t
315 {
316  assert(rt);
317  assert(read_msg);
318  struct ev_task *task = &read_msg->task;
319 
320  struct sllist queue;
321  sllist_init(&queue);
322 
323 #if !LELY_NO_THREADS
324  mtx_lock(&rt->mtx);
325 #endif
326  if (task->_data == &rt->msg_queue)
327  io_can_rt_do_pop_read_msg(rt, &queue, read_msg);
328 #if !LELY_NO_THREADS
329  mtx_unlock(&rt->mtx);
330 #endif
331 
332  return io_can_rt_read_msg_queue_post(
333  &queue, NULL, errnum2c(ERRNUM_CANCELED));
334 }
335 
336 size_t
338 {
339  assert(rt);
340  assert(read_msg);
341  struct ev_task *task = &read_msg->task;
342 
343  struct sllist queue;
344  sllist_init(&queue);
345 
346 #if !LELY_NO_THREADS
347  mtx_lock(&rt->mtx);
348 #endif
349  if (task->_data == &rt->msg_queue)
350  io_can_rt_do_pop_read_msg(rt, &queue, read_msg);
351 #if !LELY_NO_THREADS
352  mtx_unlock(&rt->mtx);
353 #endif
354 
355  return ev_task_queue_abort(&queue);
356 }
357 
358 ev_future_t *
359 io_can_rt_async_read_msg(io_can_rt_t *rt, uint_least32_t id,
360  uint_least8_t flags, struct io_can_rt_read_msg **pread_msg)
361 {
362  ev_promise_t *promise = ev_promise_create(
363  sizeof(struct io_can_rt_async_read_msg), NULL);
364  if (!promise)
365  return NULL;
366 
367  struct io_can_rt_async_read_msg *async_read_msg =
368  ev_promise_data(promise);
369  async_read_msg->promise = promise;
370  async_read_msg->read_msg =
372  flags, &io_can_rt_async_read_msg_func);
373  async_read_msg->msg = (struct can_msg)CAN_MSG_INIT;
374 
375  io_can_rt_submit_read_msg(rt, &async_read_msg->read_msg);
376 
377  if (pread_msg)
378  *pread_msg = &async_read_msg->read_msg;
379 
380  return ev_promise_get_future(promise);
381 }
382 
383 void
385 {
386  assert(rt);
387  assert(read_err);
388  struct ev_task *task = &read_err->task;
389 
390  task->exec = rt->exec;
391  ev_exec_on_task_init(task->exec);
392 
393 #if !LELY_NO_THREADS
394  mtx_lock(&rt->mtx);
395 #endif
396  if (rt->shutdown) {
397 #if !LELY_NO_THREADS
398  mtx_unlock(&rt->mtx);
399 #endif
400  io_can_rt_read_err_post(
401  read_err, NULL, errnum2c(ERRNUM_CANCELED));
402  } else {
403  task->_data = &rt->err_queue;
404  sllist_push_back(&rt->err_queue, &task->_node);
405  int submit = !rt->submitted;
406  if (submit)
407  rt->submitted = 1;
408 #if !LELY_NO_THREADS
409  mtx_unlock(&rt->mtx);
410 #endif
411  if (submit)
412  io_can_chan_submit_read(rt->chan, &rt->read);
413  }
414 }
415 
416 size_t
418 {
419  assert(rt);
420  assert(read_err);
421  struct ev_task *task = &read_err->task;
422 
423  struct sllist queue;
424  sllist_init(&queue);
425 
426 #if !LELY_NO_THREADS
427  mtx_lock(&rt->mtx);
428 #endif
429  if (task->_data == &rt->err_queue)
430  io_can_rt_do_pop_read_err(rt, &queue, read_err);
431 #if !LELY_NO_THREADS
432  mtx_unlock(&rt->mtx);
433 #endif
434 
435  return io_can_rt_read_err_queue_post(
436  &queue, NULL, errnum2c(ERRNUM_CANCELED));
437 }
438 
439 size_t
441 {
442  assert(rt);
443  assert(read_err);
444  struct ev_task *task = &read_err->task;
445 
446  struct sllist queue;
447  sllist_init(&queue);
448 
449 #if !LELY_NO_THREADS
450  mtx_lock(&rt->mtx);
451 #endif
452  if (task->_data == &rt->err_queue)
453  io_can_rt_do_pop_read_err(rt, &queue, read_err);
454 #if !LELY_NO_THREADS
455  mtx_unlock(&rt->mtx);
456 #endif
457 
458  return ev_task_queue_abort(&queue);
459 }
460 
461 ev_future_t *
463 {
464  ev_promise_t *promise = ev_promise_create(
465  sizeof(struct io_can_rt_async_read_err), NULL);
466  if (!promise)
467  return NULL;
468 
469  struct io_can_rt_async_read_err *async_read_err =
470  ev_promise_data(promise);
471  async_read_err->promise = promise;
472  async_read_err->read_err =
474  &io_can_rt_async_read_err_func);
475  async_read_err->err = (struct can_err)CAN_ERR_INIT;
476 
477  io_can_rt_submit_read_err(rt, &async_read_err->read_err);
478 
479  if (pread_err)
480  *pread_err = &async_read_err->read_err;
481 
482  return ev_promise_get_future(promise);
483 }
484 
485 ev_future_t *
487 {
488  ev_promise_t *promise = ev_promise_create(
489  sizeof(struct io_can_rt_async_shutdown), NULL);
490  if (!promise)
491  return NULL;
492 
493  struct io_can_rt_async_shutdown *async_shutdown =
494  ev_promise_data(promise);
495  async_shutdown->promise = promise;
496  async_shutdown->task = (struct ev_task)EV_TASK_INIT(
497  rt->exec, &io_can_rt_async_shutdown_func);
498 
499  io_can_rt_svc_shutdown(&rt->svc);
500  ev_exec_post(async_shutdown->task.exec, &async_shutdown->task);
501 
502  return ev_promise_get_future(promise);
503 }
504 
505 struct io_can_rt_read_msg *
507 {
508  return task ? structof(task, struct io_can_rt_read_msg, task) : NULL;
509 }
510 
511 struct io_can_rt_read_err *
513 {
514  return task ? structof(task, struct io_can_rt_read_err, task) : NULL;
515 }
516 
517 static io_ctx_t *
518 io_can_rt_dev_get_ctx(const io_dev_t *dev)
519 {
520  const io_can_rt_t *rt = io_can_rt_from_dev(dev);
521 
522  return rt->ctx;
523 }
524 
525 static ev_exec_t *
526 io_can_rt_dev_get_exec(const io_dev_t *dev)
527 {
528  const io_can_rt_t *rt = io_can_rt_from_dev(dev);
529 
530  return rt->exec;
531 }
532 
533 static size_t
534 io_can_rt_dev_cancel(io_dev_t *dev, struct ev_task *task)
535 {
536  io_can_rt_t *rt = io_can_rt_from_dev(dev);
537 
538  struct sllist msg_queue, err_queue;
539  sllist_init(&msg_queue);
540  sllist_init(&err_queue);
541 
542 #if !LELY_NO_THREADS
543  mtx_lock(&rt->mtx);
544 #endif
545  io_can_rt_do_pop(rt, &msg_queue, &err_queue, task);
546 #if !LELY_NO_THREADS
547  mtx_unlock(&rt->mtx);
548 #endif
549  size_t nmsg = io_can_rt_read_msg_queue_post(
550  &msg_queue, NULL, errnum2c(ERRNUM_CANCELED));
551  size_t nerr = io_can_rt_read_err_queue_post(
552  &err_queue, NULL, errnum2c(ERRNUM_CANCELED));
553  return nerr < SIZE_MAX - nmsg ? nmsg + nerr : SIZE_MAX;
554 }
555 
556 static size_t
557 io_can_rt_dev_abort(io_dev_t *dev, struct ev_task *task)
558 {
559  io_can_rt_t *rt = io_can_rt_from_dev(dev);
560 
561  struct sllist queue;
562  sllist_init(&queue);
563 
564 #if !LELY_NO_THREADS
565  mtx_lock(&rt->mtx);
566 #endif
567  io_can_rt_do_pop(rt, &queue, &queue, task);
568 #if !LELY_NO_THREADS
569  mtx_unlock(&rt->mtx);
570 #endif
571  return ev_task_queue_abort(&queue);
572 }
573 
574 static void
575 io_can_rt_svc_shutdown(struct io_svc *svc)
576 {
577  io_can_rt_t *rt = io_can_rt_from_svc(svc);
578  io_dev_t *dev = &rt->dev_vptr;
579 
580 #if !LELY_NO_THREADS
581  mtx_lock(&rt->mtx);
582 #endif
583  int shutdown = !rt->shutdown;
584  rt->shutdown = 1;
585  // Abort io_can_rt_read_func().
586  if (shutdown && rt->submitted
587  && io_can_chan_abort_read(rt->chan, &rt->read))
588  rt->submitted = 0;
589 #if !LELY_NO_THREADS
590  mtx_unlock(&rt->mtx);
591 #endif
592 
593  if (shutdown)
594  // Cancel all pending operations.
595  io_can_rt_dev_cancel(dev, NULL);
596 }
597 
598 static void
599 io_can_rt_read_func(struct ev_task *task)
600 {
601  assert(task);
603  io_can_rt_t *rt = structof(read, io_can_rt_t, read);
604 
605  if (rt->read.r.result > 0) {
606  struct sllist queue;
607  sllist_init(&queue);
608  struct io_can_rt_read_msg key;
609  key.id = rt->read.msg->id;
610  key.flags = rt->read.msg->flags;
611 #if !LELY_NO_THREADS
612  mtx_lock(&rt->mtx);
613 #endif
614  struct rbnode *node = rbtree_find(&rt->msg_queue, &key);
615  if (node) {
616  rbtree_remove(&rt->msg_queue, node);
617  struct io_can_rt_read_msg *read_msg = structof(
618  node, struct io_can_rt_read_msg, _node);
619  sllist_push_back(&queue, &read_msg->task._node);
620  sllist_append(&queue, &read_msg->_queue);
621  }
622 #if !LELY_NO_THREADS
623  mtx_unlock(&rt->mtx);
624 #endif
625  io_can_rt_read_msg_queue_post(&queue, rt->read.msg, 0);
626  } else if (!rt->read.r.result) {
627  struct sllist queue;
628  sllist_init(&queue);
629 #if !LELY_NO_THREADS
630  mtx_lock(&rt->mtx);
631 #endif
632  sllist_append(&queue, &rt->err_queue);
633 #if !LELY_NO_THREADS
634  mtx_unlock(&rt->mtx);
635 #endif
636  io_can_rt_read_err_queue_post(&queue, rt->read.err, 0);
637  } else if (rt->read.r.errc) {
638  struct sllist msg_queue, err_queue;
639  sllist_init(&msg_queue);
640  sllist_init(&err_queue);
641 #if !LELY_NO_THREADS
642  mtx_lock(&rt->mtx);
643 #endif
644  io_can_rt_do_pop(rt, &msg_queue, &err_queue, NULL);
645 #if !LELY_NO_THREADS
646  mtx_unlock(&rt->mtx);
647 #endif
648  io_can_rt_read_msg_queue_post(
649  &msg_queue, NULL, rt->read.r.errc);
650  io_can_rt_read_err_queue_post(
651  &err_queue, NULL, rt->read.r.errc);
652  }
653 
654  ev_exec_post(rt->task.exec, &rt->task);
655 }
656 
657 static void
658 io_can_rt_task_func(struct ev_task *task)
659 {
660  assert(task);
661  io_can_rt_t *rt = structof(task, io_can_rt_t, task);
662 
663 #if !LELY_NO_THREADS
664  mtx_lock(&rt->mtx);
665 #endif
666  assert(rt->submitted);
667  int submit = rt->submitted =
668  (!rbtree_empty(&rt->msg_queue)
669  || !sllist_empty(&rt->err_queue))
670  && !rt->shutdown;
671 #if !LELY_NO_THREADS
672  mtx_unlock(&rt->mtx);
673 #endif
674 
675  if (submit)
676  io_can_chan_submit_read(rt->chan, &rt->read);
677 }
678 
679 static inline io_can_rt_t *
680 io_can_rt_from_dev(const io_dev_t *dev)
681 {
682  assert(dev);
683 
684  return structof(dev, io_can_rt_t, dev_vptr);
685 }
686 
687 static inline io_can_rt_t *
688 io_can_rt_from_svc(const struct io_svc *svc)
689 {
690  assert(svc);
691 
692  return structof(svc, io_can_rt_t, svc);
693 }
694 
695 static void
696 io_can_rt_do_pop(io_can_rt_t *rt, struct sllist *msg_queue,
697  struct sllist *err_queue, struct ev_task *task)
698 {
699  assert(rt);
700  assert(msg_queue);
701  assert(err_queue);
702 
703  if (!task) {
704  rbtree_foreach (&rt->msg_queue, node) {
705  rbtree_remove(&rt->msg_queue, node);
706  struct io_can_rt_read_msg *read_msg = structof(
707  node, struct io_can_rt_read_msg, _node);
708  sllist_push_back(msg_queue, &read_msg->task._node);
709  sllist_append(msg_queue, &read_msg->_queue);
710  }
711  sllist_append(err_queue, &rt->err_queue);
712  if (rt->submitted)
713  io_can_chan_cancel_read(rt->chan, &rt->read);
714  } else if (task->_data == &rt->msg_queue) {
715  io_can_rt_do_pop_read_msg(rt, msg_queue,
717  } else if (task->_data == &rt->err_queue) {
718  io_can_rt_do_pop_read_err(rt, err_queue,
720  }
721 }
722 
723 static void
724 io_can_rt_do_pop_read_msg(io_can_rt_t *rt, struct sllist *queue,
725  struct io_can_rt_read_msg *read_msg)
726 {
727  assert(rt);
728  assert(queue);
729  assert(read_msg);
730  struct ev_task *task = &read_msg->task;
731  assert(task->_data == &rt->msg_queue);
732 
733  struct rbnode *node = rbtree_find(&rt->msg_queue, read_msg);
734  if (node == &read_msg->_node) {
735  rbtree_remove(&rt->msg_queue, &read_msg->_node);
736  if (!sllist_empty(&read_msg->_queue)) {
737  struct sllist queue = read_msg->_queue;
738  read_msg = io_can_rt_read_msg_from_task(
740  &queue)));
741  rbnode_init(&read_msg->_node, read_msg);
742  sllist_init(&read_msg->_queue);
743  sllist_append(&read_msg->_queue, &queue);
744  rbtree_insert(&rt->msg_queue, &read_msg->_node);
745  } else if (rt->submitted && rbtree_empty(&rt->msg_queue)
746  && sllist_empty(&rt->err_queue)) {
747  io_can_chan_cancel_read(rt->chan, &rt->read);
748  }
749  task->_data = NULL;
750  sllist_push_back(queue, &task->_node);
751  } else if (node) {
752  read_msg = structof(node, struct io_can_rt_read_msg, _node);
753  if (sllist_remove(&read_msg->_queue, &task->_node)) {
754  task->_data = NULL;
755  sllist_push_back(queue, &task->_node);
756  }
757  }
758 }
759 
760 static void
761 io_can_rt_do_pop_read_err(io_can_rt_t *rt, struct sllist *queue,
762  struct io_can_rt_read_err *read_err)
763 {
764  assert(rt);
765  assert(queue);
766  assert(read_err);
767  struct ev_task *task = &read_err->task;
768  assert(task->_data == &rt->err_queue);
769 
770  if (sllist_remove(&rt->err_queue, &task->_node)) {
771  if (rt->submitted && rbtree_empty(&rt->msg_queue)
772  && sllist_empty(&rt->err_queue))
773  io_can_chan_cancel_read(rt->chan, &rt->read);
774  task->_data = NULL;
775  sllist_push_back(queue, &task->_node);
776  }
777 }
778 
779 static void
780 io_can_rt_read_msg_post(struct io_can_rt_read_msg *read_msg,
781  const struct can_msg *msg, int errc)
782 {
783  read_msg->r.msg = msg;
784  read_msg->r.errc = errc;
785 
786  ev_exec_t *exec = read_msg->task.exec;
787  ev_exec_post(exec, &read_msg->task);
789 }
790 
791 static size_t
792 io_can_rt_read_msg_queue_post(
793  struct sllist *queue, const struct can_msg *msg, int errc)
794 {
795  size_t n = 0;
796 
797  struct slnode *node;
798  while ((node = sllist_pop_front(queue))) {
799  struct ev_task *task = ev_task_from_node(node);
800  struct io_can_rt_read_msg *read_msg =
802  io_can_rt_read_msg_post(read_msg, msg, errc);
803  n += n < SIZE_MAX;
804  }
805 
806  return n;
807 }
808 
809 static void
810 io_can_rt_read_err_post(struct io_can_rt_read_err *read_err,
811  const struct can_err *err, int errc)
812 {
813  read_err->r.err = err;
814  read_err->r.errc = errc;
815 
816  ev_exec_t *exec = read_err->task.exec;
817  ev_exec_post(exec, &read_err->task);
818  ev_exec_on_task_fini(exec);
819 }
820 
821 static size_t
822 io_can_rt_read_err_queue_post(
823  struct sllist *queue, const struct can_err *err, int errc)
824 {
825  size_t n = 0;
826 
827  struct slnode *node;
828  while ((node = sllist_pop_front(queue))) {
829  struct ev_task *task = ev_task_from_node(node);
830  struct io_can_rt_read_err *read_err =
832  io_can_rt_read_err_post(read_err, err, errc);
833  n += n < SIZE_MAX;
834  }
835 
836  return n;
837 }
838 
839 static int
840 io_can_rt_read_msg_cmp(const void *p1, const void *p2)
841 {
842  const struct io_can_rt_read_msg *r1 = p1;
843  assert(r1);
844  const struct io_can_rt_read_msg *r2 = p2;
845  assert(r2);
846 
847  int cmp = (r2->id < r1->id) - (r1->id < r2->id);
848  if (!cmp)
849  cmp = (r2->flags < r1->flags) - (r1->flags < r2->flags);
850  return cmp;
851 }
852 
853 static void
854 io_can_rt_async_read_msg_func(struct ev_task *task)
855 {
856  assert(task);
857  struct io_can_rt_read_msg *read_msg =
859  struct io_can_rt_async_read_msg *async_read_msg = structof(
860  read_msg, struct io_can_rt_async_read_msg, read_msg);
861 
862  if (ev_promise_set_acquire(async_read_msg->promise)) {
863  if (read_msg->r.msg) {
864  async_read_msg->msg = *read_msg->r.msg;
865  read_msg->r.msg = &async_read_msg->msg;
866  }
867  ev_promise_set_release(async_read_msg->promise, &read_msg->r);
868  }
869  ev_promise_release(async_read_msg->promise);
870 }
871 
872 static void
873 io_can_rt_async_read_err_func(struct ev_task *task)
874 {
875  assert(task);
876  struct io_can_rt_read_err *read_err =
878  struct io_can_rt_async_read_err *async_read_err = structof(
879  read_err, struct io_can_rt_async_read_err, read_err);
880 
881  if (ev_promise_set_acquire(async_read_err->promise)) {
882  if (read_err->r.err) {
883  async_read_err->err = *read_err->r.err;
884  read_err->r.err = &async_read_err->err;
885  }
886  ev_promise_set_release(async_read_err->promise, &read_err->r);
887  }
888  ev_promise_release(async_read_err->promise);
889 }
890 
891 static void
892 io_can_rt_async_shutdown_func(struct ev_task *task)
893 {
894  assert(task);
895  struct io_can_rt_async_shutdown *async_shutdown =
896  structof(task, struct io_can_rt_async_shutdown, task);
897 
898  ev_promise_set(async_shutdown->promise, NULL);
899  ev_promise_release(async_shutdown->promise);
900 }
#define CAN_MSG_INIT
The static initializer for can_msg.
Definition: msg.h:113
size_t io_can_rt_cancel_read_err(io_can_rt_t *rt, struct io_can_rt_read_err *read_err)
Cancels the specified CAN error frame read operation if it is pending.
Definition: can_rt.c:417
io_can_rt_t * io_can_rt_create(io_can_chan_t *chan, ev_exec_t *exec)
Creates a new CAN frame router.
Definition: can_rt.c:220
void io_can_rt_submit_read_err(io_can_rt_t *rt, struct io_can_rt_read_err *read_err)
Submits a CAN error frame read operation to a CAN frame router.
Definition: can_rt.c:384
io_dev_t * io_can_rt_get_dev(const io_can_rt_t *rt)
Returns a pointer to the abstract I/O device representing the CAN frame router.
Definition: can_rt.c:256
size_t io_can_rt_abort_read_err(io_can_rt_t *rt, struct io_can_rt_read_err *read_err)
Aborts the specified CAN error frame read operation if it is pending.
Definition: can_rt.c:440
ev_future_t * io_can_rt_async_read_err(io_can_rt_t *rt, struct io_can_rt_read_err **pread_err)
Submits an asynchronous CAN error frame read operation to a CAN frame router and creates a future whi...
Definition: can_rt.c:462
size_t io_can_rt_abort_read_msg(io_can_rt_t *rt, struct io_can_rt_read_msg *read_msg)
Aborts the specified CAN frame read operation if it is pending.
Definition: can_rt.c:337
void io_can_rt_destroy(io_can_rt_t *rt)
Destroys a CAN frame router.
Definition: can_rt.c:247
struct io_can_rt_read_err * io_can_rt_read_err_from_task(struct ev_task *task)
Obtains a pointer to a CAN error frame read operation from a pointer to its completion task.
Definition: can_rt.c:512
ev_future_t * io_can_rt_async_read_msg(io_can_rt_t *rt, uint_least32_t id, uint_least8_t flags, struct io_can_rt_read_msg **pread_msg)
Submits an asynchronous CAN frame read operation to a CAN frame router and creates a future which bec...
Definition: can_rt.c:359
void io_can_rt_submit_read_msg(io_can_rt_t *rt, struct io_can_rt_read_msg *read_msg)
Submits a CAN frame read operation to a CAN frame router.
Definition: can_rt.c:272
struct io_can_rt_read_msg * io_can_rt_read_msg_from_task(struct ev_task *task)
Obtains a pointer to a CAN frame read operation from a pointer to its completion task.
Definition: can_rt.c:506
size_t io_can_rt_cancel_read_msg(io_can_rt_t *rt, struct io_can_rt_read_msg *read_msg)
Cancels the specified CAN frame read operation if it is pending.
Definition: can_rt.c:314
io_can_chan_t * io_can_rt_get_chan(const io_can_rt_t *rt)
Returns a pointer to the CAN channel used by the CAN frame router.
Definition: can_rt.c:264
ev_future_t * io_can_rt_async_shutdown(io_can_rt_t *rt)
Shuts down a CAN frame router, cancels all pending operations and creates a (void) future which becom...
Definition: can_rt.c:486
This header file is part of the I/O library; it contains the CAN frame router declarations.
#define IO_CAN_RT_READ_MSG_INIT(id, flags, func)
The static initializer for io_can_rt_read_msg.
Definition: can_rt.h:77
#define IO_CAN_RT_READ_ERR_INIT(func)
The static initializer for io_can_rt_read_err.
Definition: can_rt.h:111
This header file is part of the I/O library; it contains the I/O context and service declarations.
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:121
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:136
This header file is part of the utilities library; it contains the native and platform-independent er...
int errnum2c(errnum_t errnum)
Transforms a platform-independent error number to a native error code.
Definition: errnum.c:825
@ ERRNUM_CANCELED
Operation canceled.
Definition: errnum.h:96
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition: errnum.c:947
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:957
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:43
This header file is part of the event library; it contains the abstract task executor interface.
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:126
void ev_exec_on_task_fini(ev_exec_t *exec)
Undoes the effect of a previous call to ev_exec_on_task_init().
Definition: exec.h:114
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:108
void ev_promise_release(ev_promise_t *promise)
Releases a reference to a promise.
Definition: future.c:195
int ev_promise_set(ev_promise_t *promise, void *value)
Satiesfies a promise, if it was not aready satisfied, and stores the specified value for retrieval by...
Definition: future.c:239
void ev_promise_set_release(ev_promise_t *promise, void *value)
Satisfies a promise prepared by ev_promise_set_acquire(), and stores the specified value for retrieva...
Definition: future.c:272
ev_future_t * ev_promise_get_future(ev_promise_t *promise)
Returns (a reference to) a future associated with the specified promise.
Definition: future.c:309
ev_promise_t * ev_promise_create(size_t size, ev_promise_dtor_t *dtor)
Constructs a new promise with an optional empty shared state.
Definition: future.c:151
void * ev_promise_data(const ev_promise_t *promise)
Returns a pointer to the shared state of a promise.
Definition: future.c:233
int ev_promise_set_acquire(ev_promise_t *promise)
Checks if the specified promise can be satisfied by the caller and, if so, prevents others from satis...
Definition: future.c:248
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
const struct io_can_chan_vtbl *const io_can_chan_t
An abstract CAN channel.
Definition: can.h:59
struct io_can_chan_read * io_can_chan_read_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel read operation from a pointer to its completion task.
Definition: can.c:91
static size_t io_can_chan_abort_read(io_can_chan_t *chan, struct io_can_chan_read *read)
Aborts the specified CAN channel read operation if it is pending.
Definition: can.h:500
static size_t io_can_chan_cancel_read(io_can_chan_t *chan, struct io_can_chan_read *read)
Cancels the specified CAN channel read operation if it is pending.
Definition: can.h:494
io_dev_t * io_can_chan_get_dev(const io_can_chan_t *chan)
Returns a pointer to the abstract I/O device representing the CAN channel.
Definition: can.h:469
void io_can_chan_submit_read(io_can_chan_t *chan, struct io_can_chan_read *read)
Submits a read operation to a CAN channel.
Definition: can.h:488
#define IO_CAN_CHAN_READ_INIT(msg, err, tp, exec, func)
The static initializer for io_can_chan_read.
Definition: can.h:104
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
#define CAN_ERR_INIT
The static initializer for a can_err struct.
Definition: err.h:43
const struct io_dev_vtbl *const io_dev_t
An abstract I/O device.
Definition: dev.h:35
io_ctx_t * io_dev_get_ctx(const io_dev_t *dev)
Returns a pointer to the I/O context with which the I/O device is registered.
Definition: dev.h:80
ev_exec_t * io_dev_get_exec(const io_dev_t *dev)
Returns a pointer to the executor used by the I/O device to execute asynchronous tasks.
Definition: dev.h:86
void rbnode_init(struct rbnode *node, const void *key)
Initializes a node in a red-black tree.
Definition: rbtree.h:229
void rbtree_insert(struct rbtree *tree, struct rbnode *node)
Inserts a node into a red-black tree.
Definition: rbtree.c:108
void rbtree_init(struct rbtree *tree, rbtree_cmp_t *cmp)
Initializes a red-black tree.
Definition: rbtree.h:238
void rbtree_remove(struct rbtree *tree, struct rbnode *node)
Removes a node from a red-black tree.
Definition: rbtree.c:187
int rbtree_empty(const struct rbtree *tree)
Returns 1 if the red-black tree is empty, and 0 if not.
Definition: rbtree.h:246
#define rbtree_foreach(tree, node)
Iterates over each node in a red-black tree in ascending order.
Definition: rbtree.h:226
struct rbnode * rbtree_find(const struct rbtree *tree, const void *key)
Finds a node in a red-black tree.
Definition: rbtree.c:306
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:184
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:233
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:213
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:190
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:221
This is the internal header file of the I/O library.
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
This header file is part of the event library; it contains the strand executor declarations.
void ev_strand_destroy(ev_exec_t *exec)
Destroys a strand executor.
Definition: strand.c:175
ev_exec_t * ev_strand_create(ev_exec_t *inner_exec)
Creates a strand executor.
Definition: strand.c:148
A CAN error frame.
Definition: err.h:28
A CAN or CAN FD format frame.
Definition: msg.h:87
uint_least32_t id
The identifier (11 or 29 bits, depending on the CAN_FLAG_IDE flag).
Definition: msg.h:89
uint_least8_t flags
The flags (any combination of CAN_FLAG_IDE, CAN_FLAG_RTR, CAN_FLAG_FDF, CAN_FLAG_BRS and CAN_FLAG_ESI...
Definition: msg.h:94
A future.
Definition: future.c:63
A promise.
Definition: future.c:92
An executable task.
Definition: task.h:41
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
int result
The result of the read operation: 1 if a CAN frame is received, 0 if an error frame is received,...
Definition: can.h:68
int errc
The error number, obtained as if by get_errc(), if result is -1.
Definition: can.h:70
A CAN channel read operation.
Definition: can.h:74
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition: can.h:98
struct io_can_chan_read_result r
The result of the read operation.
Definition: can.h:100
struct can_msg * msg
The address at which to store the CAN frame.
Definition: can.h:80
struct can_err * err
The address at which to store the CAN error frame.
Definition: can.h:86
const struct can_err * err
A pointer to the received CAN error frame, or NULL on error (or if the operation is canceled).
Definition: can_rt.h:94
int errc
The error number, obtained as if by get_errc(), if err is NULL.
Definition: can_rt.h:96
A CAN error frame read operation suitable for use with a CAN frame router.
Definition: can_rt.h:100
struct io_can_rt_read_err_result r
The result of the read operation.
Definition: can_rt.h:107
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition: can_rt.h:105
int errc
The error number, obtained as if by get_errc(), if msg is NULL.
Definition: can_rt.h:48
const struct can_msg * msg
A pointer to the received CAN frame, or NULL on error (or if the operation is canceled).
Definition: can_rt.h:46
A CAN frame read operation suitable for use with a CAN frame router.
Definition: can_rt.h:52
uint_least32_t id
The identifier of the CAN frame to be received.
Definition: can_rt.h:57
struct io_can_rt_read_msg_result r
The result of the read operation.
Definition: can_rt.h:71
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition: can_rt.h:69
uint_least8_t flags
The flags of the CAN frame to be received (any combination of CAN_FLAG_IDE, CAN_FLAG_RTR,...
Definition: can_rt.h:64
Definition: ctx.c:35
The virtual table of an I/O service.
Definition: ctx.h:67
An I/O service.
Definition: ctx.h:49
A node in a red-black tree.
Definition: rbtree.h:52
const void * key
A pointer to the key for this node.
Definition: rbtree.h:58
A red-black tree.
Definition: rbtree.h:90
A singly-linked list.
Definition: sllist.h:51
A node in a singly-linked list.
Definition: sllist.h:39
size_t ev_task_queue_abort(struct sllist *queue)
Aborts the tasks in queue by invoking ev_exec_on_task_fini() for each of them.
Definition: task.c:55
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109