Lely core libraries  2.3.4
can_rt.c
Go to the documentation of this file.
1 
24 #include "io2.h"
25 
26 #if !LELY_NO_MALLOC
27 
28 #if !LELY_NO_THREADS
29 #include <lely/libc/threads.h>
30 #endif
31 #include <lely/ev/exec.h>
32 #include <lely/ev/strand.h>
33 #include <lely/io2/can_rt.h>
34 #include <lely/io2/ctx.h>
35 #include <lely/util/errnum.h>
36 #include <lely/util/util.h>
37 
38 #include <assert.h>
39 #include <stdlib.h>
40 
41 static io_ctx_t *io_can_rt_dev_get_ctx(const io_dev_t *dev);
42 static ev_exec_t *io_can_rt_dev_get_exec(const io_dev_t *dev);
43 static size_t io_can_rt_dev_cancel(io_dev_t *dev, struct ev_task *task);
44 static size_t io_can_rt_dev_abort(io_dev_t *dev, struct ev_task *task);
45 
46 // clang-format off
47 static const struct io_dev_vtbl io_can_rt_dev_vtbl = {
48  &io_can_rt_dev_get_ctx,
49  &io_can_rt_dev_get_exec,
50  &io_can_rt_dev_cancel,
51  &io_can_rt_dev_abort
52 };
53 // clang-format on
54 
55 static void io_can_rt_svc_shutdown(struct io_svc *svc);
56 
57 // clang-format off
58 static const struct io_svc_vtbl io_can_rt_svc_vtbl = {
59  NULL,
60  &io_can_rt_svc_shutdown
61 };
62 // clang-format on
63 
64 struct io_can_rt {
65  const struct io_dev_vtbl *dev_vptr;
66  struct io_svc svc;
67  io_ctx_t *ctx;
68  io_can_chan_t *chan;
69  ev_exec_t *exec;
70  struct can_msg msg;
71  struct can_err err;
72  struct io_can_chan_read read;
73  struct ev_task task;
74 #if !LELY_NO_THREADS
75  mtx_t mtx;
76 #endif
77  unsigned shutdown : 1;
78  unsigned submitted : 1;
79  struct rbtree msg_queue;
80  struct sllist err_queue;
81 };
82 
83 static void io_can_rt_read_func(struct ev_task *task);
84 static void io_can_rt_task_func(struct ev_task *task);
85 
86 static inline io_can_rt_t *io_can_rt_from_dev(const io_dev_t *dev);
87 static inline io_can_rt_t *io_can_rt_from_svc(const struct io_svc *svc);
88 
89 static void io_can_rt_do_pop(io_can_rt_t *rt, struct sllist *msg_queue,
90  struct sllist *err_queue, struct ev_task *task);
91 static void io_can_rt_do_pop_read_msg(io_can_rt_t *rt, struct sllist *queue,
92  struct io_can_rt_read_msg *read_msg);
93 static void io_can_rt_do_pop_read_err(io_can_rt_t *rt, struct sllist *queue,
94  struct io_can_rt_read_err *read_err);
95 
96 static void io_can_rt_read_msg_post(struct io_can_rt_read_msg *read_msg,
97  const struct can_msg *msg, int errc);
98 static size_t io_can_rt_read_msg_queue_post(
99  struct sllist *queue, const struct can_msg *msg, int errc);
100 
101 static void io_can_rt_read_err_post(struct io_can_rt_read_err *read_err,
102  const struct can_err *err, int errc);
103 static size_t io_can_rt_read_err_queue_post(
104  struct sllist *queue, const struct can_err *err, int errc);
105 
106 static int io_can_rt_read_msg_cmp(const void *p1, const void *p2);
107 
109  ev_promise_t *promise;
110  struct io_can_rt_read_msg read_msg;
111  struct can_msg msg;
112 };
113 
114 static void io_can_rt_async_read_msg_func(struct ev_task *task);
115 
117  ev_promise_t *promise;
118  struct io_can_rt_read_err read_err;
119  struct can_err err;
120 };
121 
122 static void io_can_rt_async_read_err_func(struct ev_task *task);
123 
125  ev_promise_t *promise;
126  struct ev_task task;
127 };
128 
129 static void io_can_rt_async_shutdown_func(struct ev_task *task);
130 
131 void *
132 io_can_rt_alloc(void)
133 {
134  void *ptr = malloc(sizeof(io_can_rt_t));
135 #if !LELY_NO_ERRNO
136  if (!ptr)
137  set_errc(errno2c(errno));
138 #endif
139  return ptr;
140 }
141 
142 void
143 io_can_rt_free(void *ptr)
144 {
145  free(ptr);
146 }
147 
148 io_can_rt_t *
149 io_can_rt_init(io_can_rt_t *rt, io_can_chan_t *chan, ev_exec_t *exec)
150 {
151  assert(rt);
152  assert(chan);
153  io_dev_t *dev = io_can_chan_get_dev(chan);
154  assert(dev);
155  io_ctx_t *ctx = io_dev_get_ctx(dev);
156  assert(ctx);
157  if (!exec)
158  exec = io_dev_get_exec(dev);
159  assert(exec);
160 
161  int errc = 0;
162 
163  rt->dev_vptr = &io_can_rt_dev_vtbl;
164 
165  rt->svc = (struct io_svc)IO_SVC_INIT(&io_can_rt_svc_vtbl);
166  rt->ctx = ctx;
167 
168  rt->chan = chan;
169 
170  rt->exec = ev_strand_create(exec);
171  if (!rt->exec) {
172  errc = get_errc();
173  goto error_create_strand;
174  }
175 
176  rt->msg = (struct can_msg)CAN_MSG_INIT;
177  rt->err = (struct can_err)CAN_ERR_INIT;
178  rt->read = (struct io_can_chan_read)IO_CAN_CHAN_READ_INIT(&rt->msg,
179  &rt->err, NULL, rt->exec, &io_can_rt_read_func);
180 
181  rt->task = (struct ev_task)EV_TASK_INIT(rt->exec, &io_can_rt_task_func);
182 
183 #if !LELY_NO_THREADS
184  if (mtx_init(&rt->mtx, mtx_plain) != thrd_success) {
185  errc = get_errc();
186  goto error_init_mtx;
187  }
188 #endif
189 
190  rt->shutdown = 0;
191  rt->submitted = 0;
192 
193  rbtree_init(&rt->msg_queue, &io_can_rt_read_msg_cmp);
194  sllist_init(&rt->err_queue);
195 
196  io_ctx_insert(rt->ctx, &rt->svc);
197 
198  return rt;
199 
200 #if !LELY_NO_THREADS
201  // mtx_destroy(&rt->mtx);
202 error_init_mtx:
203 #endif
204  ev_strand_destroy(rt->exec);
205 error_create_strand:
206  set_errc(errc);
207  return NULL;
208 }
209 
210 void
211 io_can_rt_fini(io_can_rt_t *rt)
212 {
213  assert(rt);
214 
215  io_ctx_remove(rt->ctx, &rt->svc);
216 
217 #if !LELY_NO_THREADS
218  mtx_destroy(&rt->mtx);
219 #endif
220 
221  ev_strand_destroy(rt->exec);
222 }
223 
224 io_can_rt_t *
226 {
227  int errc = 0;
228 
229  io_can_rt_t *rt = io_can_rt_alloc();
230  if (!rt) {
231  errc = get_errc();
232  goto error_alloc;
233  }
234 
235  io_can_rt_t *tmp = io_can_rt_init(rt, chan, exec);
236  if (!tmp) {
237  errc = get_errc();
238  goto error_init;
239  }
240  rt = tmp;
241 
242  return rt;
243 
244 error_init:
245  io_can_rt_free(rt);
246 error_alloc:
247  set_errc(errc);
248  return NULL;
249 }
250 
251 void
253 {
254  if (rt) {
255  io_can_rt_fini(rt);
256  io_can_rt_free(rt);
257  }
258 }
259 
260 io_dev_t *
262 {
263  assert(rt);
264 
265  return &rt->dev_vptr;
266 }
267 
270 {
271  assert(rt);
272 
273  return rt->chan;
274 }
275 
276 void
278 {
279  assert(rt);
280  assert(read_msg);
281  struct ev_task *task = &read_msg->task;
282 
283  task->exec = rt->exec;
284  ev_exec_on_task_init(task->exec);
285 
286 #if !LELY_NO_THREADS
287  mtx_lock(&rt->mtx);
288 #endif
289  if (rt->shutdown) {
290 #if !LELY_NO_THREADS
291  mtx_unlock(&rt->mtx);
292 #endif
293  io_can_rt_read_msg_post(
294  read_msg, NULL, errnum2c(ERRNUM_CANCELED));
295  } else {
296  task->_data = &rt->msg_queue;
297  struct rbnode *node = rbtree_find(&rt->msg_queue, read_msg);
298  if (node) {
299  read_msg = structof(
300  node, struct io_can_rt_read_msg, _node);
301  sllist_push_back(&read_msg->_queue, &task->_node);
302  } else {
303  rbnode_init(&read_msg->_node, read_msg);
304  sllist_init(&read_msg->_queue);
305  rbtree_insert(&rt->msg_queue, &read_msg->_node);
306  }
307  int submit = !rt->submitted;
308  if (submit)
309  rt->submitted = 1;
310 #if !LELY_NO_THREADS
311  mtx_unlock(&rt->mtx);
312 #endif
313  // cppcheck-suppress duplicateCondition
314  if (submit)
315  io_can_chan_submit_read(rt->chan, &rt->read);
316  }
317 }
318 
319 size_t
321 {
322  assert(rt);
323  assert(read_msg);
324  struct ev_task *task = &read_msg->task;
325 
326  struct sllist queue;
327  sllist_init(&queue);
328 
329 #if !LELY_NO_THREADS
330  mtx_lock(&rt->mtx);
331 #endif
332  if (task->_data == &rt->msg_queue)
333  io_can_rt_do_pop_read_msg(rt, &queue, read_msg);
334 #if !LELY_NO_THREADS
335  mtx_unlock(&rt->mtx);
336 #endif
337 
338  return io_can_rt_read_msg_queue_post(
339  &queue, NULL, errnum2c(ERRNUM_CANCELED));
340 }
341 
342 size_t
344 {
345  assert(rt);
346  assert(read_msg);
347  struct ev_task *task = &read_msg->task;
348 
349  struct sllist queue;
350  sllist_init(&queue);
351 
352 #if !LELY_NO_THREADS
353  mtx_lock(&rt->mtx);
354 #endif
355  if (task->_data == &rt->msg_queue)
356  io_can_rt_do_pop_read_msg(rt, &queue, read_msg);
357 #if !LELY_NO_THREADS
358  mtx_unlock(&rt->mtx);
359 #endif
360 
361  return ev_task_queue_abort(&queue);
362 }
363 
364 ev_future_t *
365 io_can_rt_async_read_msg(io_can_rt_t *rt, uint_least32_t id,
366  uint_least8_t flags, struct io_can_rt_read_msg **pread_msg)
367 {
368  ev_promise_t *promise = ev_promise_create(
369  sizeof(struct io_can_rt_async_read_msg), NULL);
370  if (!promise)
371  return NULL;
372  ev_future_t *future = ev_promise_get_future(promise);
373 
374  struct io_can_rt_async_read_msg *async_read_msg =
375  ev_promise_data(promise);
376  async_read_msg->promise = promise;
377  async_read_msg->read_msg =
379  flags, &io_can_rt_async_read_msg_func);
380  async_read_msg->msg = (struct can_msg)CAN_MSG_INIT;
381 
382  io_can_rt_submit_read_msg(rt, &async_read_msg->read_msg);
383 
384  if (pread_msg)
385  *pread_msg = &async_read_msg->read_msg;
386 
387  return future;
388 }
389 
390 void
392 {
393  assert(rt);
394  assert(read_err);
395  struct ev_task *task = &read_err->task;
396 
397  task->exec = rt->exec;
398  ev_exec_on_task_init(task->exec);
399 
400 #if !LELY_NO_THREADS
401  mtx_lock(&rt->mtx);
402 #endif
403  if (rt->shutdown) {
404 #if !LELY_NO_THREADS
405  mtx_unlock(&rt->mtx);
406 #endif
407  io_can_rt_read_err_post(
408  read_err, NULL, errnum2c(ERRNUM_CANCELED));
409  } else {
410  task->_data = &rt->err_queue;
411  sllist_push_back(&rt->err_queue, &task->_node);
412  int submit = !rt->submitted;
413  if (submit)
414  rt->submitted = 1;
415 #if !LELY_NO_THREADS
416  mtx_unlock(&rt->mtx);
417 #endif
418  // cppcheck-suppress duplicateCondition
419  if (submit)
420  io_can_chan_submit_read(rt->chan, &rt->read);
421  }
422 }
423 
424 size_t
426 {
427  assert(rt);
428  assert(read_err);
429  struct ev_task *task = &read_err->task;
430 
431  struct sllist queue;
432  sllist_init(&queue);
433 
434 #if !LELY_NO_THREADS
435  mtx_lock(&rt->mtx);
436 #endif
437  if (task->_data == &rt->err_queue)
438  io_can_rt_do_pop_read_err(rt, &queue, read_err);
439 #if !LELY_NO_THREADS
440  mtx_unlock(&rt->mtx);
441 #endif
442 
443  return io_can_rt_read_err_queue_post(
444  &queue, NULL, errnum2c(ERRNUM_CANCELED));
445 }
446 
447 size_t
449 {
450  assert(rt);
451  assert(read_err);
452  struct ev_task *task = &read_err->task;
453 
454  struct sllist queue;
455  sllist_init(&queue);
456 
457 #if !LELY_NO_THREADS
458  mtx_lock(&rt->mtx);
459 #endif
460  if (task->_data == &rt->err_queue)
461  io_can_rt_do_pop_read_err(rt, &queue, read_err);
462 #if !LELY_NO_THREADS
463  mtx_unlock(&rt->mtx);
464 #endif
465 
466  return ev_task_queue_abort(&queue);
467 }
468 
469 ev_future_t *
471 {
472  ev_promise_t *promise = ev_promise_create(
473  sizeof(struct io_can_rt_async_read_err), NULL);
474  if (!promise)
475  return NULL;
476  ev_future_t *future = ev_promise_get_future(promise);
477 
478  struct io_can_rt_async_read_err *async_read_err =
479  ev_promise_data(promise);
480  async_read_err->promise = promise;
481  async_read_err->read_err =
483  &io_can_rt_async_read_err_func);
484  async_read_err->err = (struct can_err)CAN_ERR_INIT;
485 
486  io_can_rt_submit_read_err(rt, &async_read_err->read_err);
487 
488  if (pread_err)
489  *pread_err = &async_read_err->read_err;
490 
491  return future;
492 }
493 
494 ev_future_t *
496 {
497  ev_promise_t *promise = ev_promise_create(
498  sizeof(struct io_can_rt_async_shutdown), NULL);
499  if (!promise)
500  return NULL;
501  ev_future_t *future = ev_promise_get_future(promise);
502 
503  struct io_can_rt_async_shutdown *async_shutdown =
504  ev_promise_data(promise);
505  async_shutdown->promise = promise;
506  async_shutdown->task = (struct ev_task)EV_TASK_INIT(
507  rt->exec, &io_can_rt_async_shutdown_func);
508 
509  io_can_rt_svc_shutdown(&rt->svc);
510  ev_exec_post(async_shutdown->task.exec, &async_shutdown->task);
511 
512  return future;
513 }
514 
515 struct io_can_rt_read_msg *
517 {
518  return task ? structof(task, struct io_can_rt_read_msg, task) : NULL;
519 }
520 
521 struct io_can_rt_read_err *
523 {
524  return task ? structof(task, struct io_can_rt_read_err, task) : NULL;
525 }
526 
527 static io_ctx_t *
528 io_can_rt_dev_get_ctx(const io_dev_t *dev)
529 {
530  const io_can_rt_t *rt = io_can_rt_from_dev(dev);
531 
532  return rt->ctx;
533 }
534 
535 static ev_exec_t *
536 io_can_rt_dev_get_exec(const io_dev_t *dev)
537 {
538  const io_can_rt_t *rt = io_can_rt_from_dev(dev);
539 
540  return rt->exec;
541 }
542 
543 static size_t
544 io_can_rt_dev_cancel(io_dev_t *dev, struct ev_task *task)
545 {
546  io_can_rt_t *rt = io_can_rt_from_dev(dev);
547 
548  struct sllist msg_queue, err_queue;
549  sllist_init(&msg_queue);
550  sllist_init(&err_queue);
551 
552 #if !LELY_NO_THREADS
553  mtx_lock(&rt->mtx);
554 #endif
555  io_can_rt_do_pop(rt, &msg_queue, &err_queue, task);
556 #if !LELY_NO_THREADS
557  mtx_unlock(&rt->mtx);
558 #endif
559  size_t nmsg = io_can_rt_read_msg_queue_post(
560  &msg_queue, NULL, errnum2c(ERRNUM_CANCELED));
561  size_t nerr = io_can_rt_read_err_queue_post(
562  &err_queue, NULL, errnum2c(ERRNUM_CANCELED));
563  return nerr < SIZE_MAX - nmsg ? nmsg + nerr : SIZE_MAX;
564 }
565 
566 static size_t
567 io_can_rt_dev_abort(io_dev_t *dev, struct ev_task *task)
568 {
569  io_can_rt_t *rt = io_can_rt_from_dev(dev);
570 
571  struct sllist queue;
572  sllist_init(&queue);
573 
574 #if !LELY_NO_THREADS
575  mtx_lock(&rt->mtx);
576 #endif
577  io_can_rt_do_pop(rt, &queue, &queue, task);
578 #if !LELY_NO_THREADS
579  mtx_unlock(&rt->mtx);
580 #endif
581  return ev_task_queue_abort(&queue);
582 }
583 
584 static void
585 io_can_rt_svc_shutdown(struct io_svc *svc)
586 {
587  io_can_rt_t *rt = io_can_rt_from_svc(svc);
588  io_dev_t *dev = &rt->dev_vptr;
589 
590 #if !LELY_NO_THREADS
591  mtx_lock(&rt->mtx);
592 #endif
593  int shutdown = !rt->shutdown;
594  rt->shutdown = 1;
595  // Abort io_can_rt_read_func().
596  if (shutdown && rt->submitted
597  && io_can_chan_abort_read(rt->chan, &rt->read))
598  rt->submitted = 0;
599 #if !LELY_NO_THREADS
600  mtx_unlock(&rt->mtx);
601 #endif
602 
603  if (shutdown)
604  // Cancel all pending operations.
605  io_can_rt_dev_cancel(dev, NULL);
606 }
607 
608 static void
609 io_can_rt_read_func(struct ev_task *task)
610 {
611  assert(task);
613  io_can_rt_t *rt = structof(read, io_can_rt_t, read);
614 
615  if (rt->read.r.result > 0) {
616  struct sllist queue;
617  sllist_init(&queue);
618  struct io_can_rt_read_msg key;
619  key.id = rt->read.msg->id;
620  key.flags = rt->read.msg->flags;
621 #if !LELY_NO_THREADS
622  mtx_lock(&rt->mtx);
623 #endif
624  struct rbnode *node = rbtree_find(&rt->msg_queue, &key);
625  if (node) {
626  rbtree_remove(&rt->msg_queue, node);
627  struct io_can_rt_read_msg *read_msg = structof(
628  node, struct io_can_rt_read_msg, _node);
629  sllist_push_back(&queue, &read_msg->task._node);
630  sllist_append(&queue, &read_msg->_queue);
631  }
632 #if !LELY_NO_THREADS
633  mtx_unlock(&rt->mtx);
634 #endif
635  io_can_rt_read_msg_queue_post(&queue, rt->read.msg, 0);
636  } else if (!rt->read.r.result) {
637  struct sllist queue;
638  sllist_init(&queue);
639 #if !LELY_NO_THREADS
640  mtx_lock(&rt->mtx);
641 #endif
642  sllist_append(&queue, &rt->err_queue);
643 #if !LELY_NO_THREADS
644  mtx_unlock(&rt->mtx);
645 #endif
646  io_can_rt_read_err_queue_post(&queue, rt->read.err, 0);
647  } else if (rt->read.r.errc) {
648  struct sllist msg_queue, err_queue;
649  sllist_init(&msg_queue);
650  sllist_init(&err_queue);
651 #if !LELY_NO_THREADS
652  mtx_lock(&rt->mtx);
653 #endif
654  io_can_rt_do_pop(rt, &msg_queue, &err_queue, NULL);
655 #if !LELY_NO_THREADS
656  mtx_unlock(&rt->mtx);
657 #endif
658  io_can_rt_read_msg_queue_post(
659  &msg_queue, NULL, rt->read.r.errc);
660  io_can_rt_read_err_queue_post(
661  &err_queue, NULL, rt->read.r.errc);
662  }
663 
664  ev_exec_post(rt->task.exec, &rt->task);
665 }
666 
667 static void
668 io_can_rt_task_func(struct ev_task *task)
669 {
670  assert(task);
671  io_can_rt_t *rt = structof(task, io_can_rt_t, task);
672 
673 #if !LELY_NO_THREADS
674  mtx_lock(&rt->mtx);
675 #endif
676  assert(rt->submitted);
677  int submit = rt->submitted =
678  (!rbtree_empty(&rt->msg_queue)
679  || !sllist_empty(&rt->err_queue))
680  && !rt->shutdown;
681 #if !LELY_NO_THREADS
682  mtx_unlock(&rt->mtx);
683 #endif
684 
685  if (submit)
686  io_can_chan_submit_read(rt->chan, &rt->read);
687 }
688 
689 static inline io_can_rt_t *
690 io_can_rt_from_dev(const io_dev_t *dev)
691 {
692  assert(dev);
693 
694  return structof(dev, io_can_rt_t, dev_vptr);
695 }
696 
697 static inline io_can_rt_t *
698 io_can_rt_from_svc(const struct io_svc *svc)
699 {
700  assert(svc);
701 
702  return structof(svc, io_can_rt_t, svc);
703 }
704 
705 static void
706 io_can_rt_do_pop(io_can_rt_t *rt, struct sllist *msg_queue,
707  struct sllist *err_queue, struct ev_task *task)
708 {
709  assert(rt);
710  assert(msg_queue);
711  assert(err_queue);
712 
713  if (!task) {
714  rbtree_foreach (&rt->msg_queue, node) {
715  rbtree_remove(&rt->msg_queue, node);
716  struct io_can_rt_read_msg *read_msg = structof(
717  node, struct io_can_rt_read_msg, _node);
718  sllist_push_back(msg_queue, &read_msg->task._node);
719  sllist_append(msg_queue, &read_msg->_queue);
720  }
721  sllist_append(err_queue, &rt->err_queue);
722  if (rt->submitted)
723  io_can_chan_cancel_read(rt->chan, &rt->read);
724  } else if (task->_data == &rt->msg_queue) {
725  io_can_rt_do_pop_read_msg(rt, msg_queue,
727  } else if (task->_data == &rt->err_queue) {
728  io_can_rt_do_pop_read_err(rt, err_queue,
730  }
731 }
732 
733 static void
734 io_can_rt_do_pop_read_msg(io_can_rt_t *rt, struct sllist *queue,
735  struct io_can_rt_read_msg *read_msg)
736 {
737  assert(rt);
738  assert(queue);
739  assert(read_msg);
740  struct ev_task *task = &read_msg->task;
741  assert(task->_data == &rt->msg_queue);
742 
743  struct rbnode *node = rbtree_find(&rt->msg_queue, read_msg);
744  if (node == &read_msg->_node) {
745  rbtree_remove(&rt->msg_queue, &read_msg->_node);
746  if (!sllist_empty(&read_msg->_queue)) {
747  struct sllist queue = read_msg->_queue;
748  read_msg = io_can_rt_read_msg_from_task(
750  &queue)));
751  rbnode_init(&read_msg->_node, read_msg);
752  sllist_init(&read_msg->_queue);
753  sllist_append(&read_msg->_queue, &queue);
754  rbtree_insert(&rt->msg_queue, &read_msg->_node);
755  } else if (rt->submitted && rbtree_empty(&rt->msg_queue)
756  && sllist_empty(&rt->err_queue)) {
757  io_can_chan_cancel_read(rt->chan, &rt->read);
758  }
759  task->_data = NULL;
760  sllist_push_back(queue, &task->_node);
761  } else if (node) {
762  read_msg = structof(node, struct io_can_rt_read_msg, _node);
763  if (sllist_remove(&read_msg->_queue, &task->_node)) {
764  task->_data = NULL;
765  sllist_push_back(queue, &task->_node);
766  }
767  }
768 }
769 
770 static void
771 io_can_rt_do_pop_read_err(io_can_rt_t *rt, struct sllist *queue,
772  struct io_can_rt_read_err *read_err)
773 {
774  assert(rt);
775  assert(queue);
776  assert(read_err);
777  struct ev_task *task = &read_err->task;
778  assert(task->_data == &rt->err_queue);
779 
780  if (sllist_remove(&rt->err_queue, &task->_node)) {
781  if (rt->submitted && rbtree_empty(&rt->msg_queue)
782  && sllist_empty(&rt->err_queue))
783  io_can_chan_cancel_read(rt->chan, &rt->read);
784  task->_data = NULL;
785  sllist_push_back(queue, &task->_node);
786  }
787 }
788 
789 static void
790 io_can_rt_read_msg_post(struct io_can_rt_read_msg *read_msg,
791  const struct can_msg *msg, int errc)
792 {
793  read_msg->r.msg = msg;
794  read_msg->r.errc = errc;
795 
796  ev_exec_t *exec = read_msg->task.exec;
797  ev_exec_post(exec, &read_msg->task);
799 }
800 
801 static size_t
802 io_can_rt_read_msg_queue_post(
803  struct sllist *queue, const struct can_msg *msg, int errc)
804 {
805  size_t n = 0;
806 
807  struct slnode *node;
808  while ((node = sllist_pop_front(queue))) {
809  struct ev_task *task = ev_task_from_node(node);
810  struct io_can_rt_read_msg *read_msg =
812  io_can_rt_read_msg_post(read_msg, msg, errc);
813  n += n < SIZE_MAX;
814  }
815 
816  return n;
817 }
818 
819 static void
820 io_can_rt_read_err_post(struct io_can_rt_read_err *read_err,
821  const struct can_err *err, int errc)
822 {
823  read_err->r.err = err;
824  read_err->r.errc = errc;
825 
826  ev_exec_t *exec = read_err->task.exec;
827  ev_exec_post(exec, &read_err->task);
828  ev_exec_on_task_fini(exec);
829 }
830 
831 static size_t
832 io_can_rt_read_err_queue_post(
833  struct sllist *queue, const struct can_err *err, int errc)
834 {
835  size_t n = 0;
836 
837  struct slnode *node;
838  while ((node = sllist_pop_front(queue))) {
839  struct ev_task *task = ev_task_from_node(node);
840  struct io_can_rt_read_err *read_err =
842  io_can_rt_read_err_post(read_err, err, errc);
843  n += n < SIZE_MAX;
844  }
845 
846  return n;
847 }
848 
849 static int
850 io_can_rt_read_msg_cmp(const void *p1, const void *p2)
851 {
852  const struct io_can_rt_read_msg *r1 = p1;
853  assert(r1);
854  const struct io_can_rt_read_msg *r2 = p2;
855  assert(r2);
856 
857  int cmp = (r2->id < r1->id) - (r1->id < r2->id);
858  if (!cmp)
859  cmp = (r2->flags < r1->flags) - (r1->flags < r2->flags);
860  return cmp;
861 }
862 
863 static void
864 io_can_rt_async_read_msg_func(struct ev_task *task)
865 {
866  assert(task);
867  struct io_can_rt_read_msg *read_msg =
869  struct io_can_rt_async_read_msg *async_read_msg = structof(
870  read_msg, struct io_can_rt_async_read_msg, read_msg);
871 
872  if (ev_promise_set_acquire(async_read_msg->promise)) {
873  if (read_msg->r.msg) {
874  async_read_msg->msg = *read_msg->r.msg;
875  read_msg->r.msg = &async_read_msg->msg;
876  }
877  ev_promise_set_release(async_read_msg->promise, &read_msg->r);
878  }
879  ev_promise_release(async_read_msg->promise);
880 }
881 
882 static void
883 io_can_rt_async_read_err_func(struct ev_task *task)
884 {
885  assert(task);
886  struct io_can_rt_read_err *read_err =
888  struct io_can_rt_async_read_err *async_read_err = structof(
889  read_err, struct io_can_rt_async_read_err, read_err);
890 
891  if (ev_promise_set_acquire(async_read_err->promise)) {
892  if (read_err->r.err) {
893  async_read_err->err = *read_err->r.err;
894  read_err->r.err = &async_read_err->err;
895  }
896  ev_promise_set_release(async_read_err->promise, &read_err->r);
897  }
898  ev_promise_release(async_read_err->promise);
899 }
900 
901 static void
902 io_can_rt_async_shutdown_func(struct ev_task *task)
903 {
904  assert(task);
905  struct io_can_rt_async_shutdown *async_shutdown =
906  structof(task, struct io_can_rt_async_shutdown, task);
907 
908  ev_promise_set(async_shutdown->promise, NULL);
909  ev_promise_release(async_shutdown->promise);
910 }
911 
912 #endif // !LELY_NO_MALLOC
#define CAN_MSG_INIT
The static initializer for can_msg.
Definition: msg.h:113
size_t io_can_rt_cancel_read_err(io_can_rt_t *rt, struct io_can_rt_read_err *read_err)
Cancels the specified CAN error frame read operation if it is pending.
Definition: can_rt.c:425
io_can_rt_t * io_can_rt_create(io_can_chan_t *chan, ev_exec_t *exec)
Creates a new CAN frame router.
Definition: can_rt.c:225
void io_can_rt_submit_read_err(io_can_rt_t *rt, struct io_can_rt_read_err *read_err)
Submits a CAN error frame read operation to a CAN frame router.
Definition: can_rt.c:391
io_dev_t * io_can_rt_get_dev(const io_can_rt_t *rt)
Returns a pointer to the abstract I/O device representing the CAN frame router.
Definition: can_rt.c:261
size_t io_can_rt_abort_read_err(io_can_rt_t *rt, struct io_can_rt_read_err *read_err)
Aborts the specified CAN error frame read operation if it is pending.
Definition: can_rt.c:448
ev_future_t * io_can_rt_async_read_err(io_can_rt_t *rt, struct io_can_rt_read_err **pread_err)
Submits an asynchronous CAN error frame read operation to a CAN frame router and creates a future whi...
Definition: can_rt.c:470
size_t io_can_rt_abort_read_msg(io_can_rt_t *rt, struct io_can_rt_read_msg *read_msg)
Aborts the specified CAN frame read operation if it is pending.
Definition: can_rt.c:343
void io_can_rt_destroy(io_can_rt_t *rt)
Destroys a CAN frame router.
Definition: can_rt.c:252
struct io_can_rt_read_err * io_can_rt_read_err_from_task(struct ev_task *task)
Obtains a pointer to a CAN error frame read operation from a pointer to its completion task.
Definition: can_rt.c:522
ev_future_t * io_can_rt_async_read_msg(io_can_rt_t *rt, uint_least32_t id, uint_least8_t flags, struct io_can_rt_read_msg **pread_msg)
Submits an asynchronous CAN frame read operation to a CAN frame router and creates a future which bec...
Definition: can_rt.c:365
void io_can_rt_submit_read_msg(io_can_rt_t *rt, struct io_can_rt_read_msg *read_msg)
Submits a CAN frame read operation to a CAN frame router.
Definition: can_rt.c:277
struct io_can_rt_read_msg * io_can_rt_read_msg_from_task(struct ev_task *task)
Obtains a pointer to a CAN frame read operation from a pointer to its completion task.
Definition: can_rt.c:516
size_t io_can_rt_cancel_read_msg(io_can_rt_t *rt, struct io_can_rt_read_msg *read_msg)
Cancels the specified CAN frame read operation if it is pending.
Definition: can_rt.c:320
io_can_chan_t * io_can_rt_get_chan(const io_can_rt_t *rt)
Returns a pointer to the CAN channel used by the CAN frame router.
Definition: can_rt.c:269
ev_future_t * io_can_rt_async_shutdown(io_can_rt_t *rt)
Shuts down a CAN frame router, cancels all pending operations and creates a (void) future which becom...
Definition: can_rt.c:495
This header file is part of the I/O library; it contains the CAN frame router declarations.
#define IO_CAN_RT_READ_MSG_INIT(id, flags, func)
The static initializer for io_can_rt_read_msg.
Definition: can_rt.h:77
#define IO_CAN_RT_READ_ERR_INIT(func)
The static initializer for io_can_rt_read_err.
Definition: can_rt.h:111
This header file is part of the I/O library; it contains the I/O context and service declarations.
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:126
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:141
This header file is part of the utilities library; it contains the native and platform-independent er...
int errnum2c(errnum_t errnum)
Transforms a platform-independent error number to a native error code.
Definition: errnum.c:810
@ ERRNUM_CANCELED
Operation canceled.
Definition: errnum.h:99
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition: errnum.c:932
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:944
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:46
This header file is part of the event library; it contains the abstract task executor interface.
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:124
void ev_exec_on_task_fini(ev_exec_t *exec)
Undoes the effect of a previous call to ev_exec_on_task_init().
Definition: exec.h:112
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:106
void ev_promise_release(ev_promise_t *promise)
Releases a reference to a promise.
Definition: future.c:198
int ev_promise_set(ev_promise_t *promise, void *value)
Satiesfies a promise, if it was not aready satisfied, and stores the specified value for retrieval by...
Definition: future.c:242
void ev_promise_set_release(ev_promise_t *promise, void *value)
Satisfies a promise prepared by ev_promise_set_acquire(), and stores the specified value for retrieva...
Definition: future.c:275
ev_future_t * ev_promise_get_future(ev_promise_t *promise)
Returns (a reference to) a future associated with the specified promise.
Definition: future.c:312
ev_promise_t * ev_promise_create(size_t size, ev_promise_dtor_t *dtor)
Constructs a new promise with an optional empty shared state.
Definition: future.c:154
void * ev_promise_data(const ev_promise_t *promise)
Returns a pointer to the shared state of a promise.
Definition: future.c:236
int ev_promise_set_acquire(ev_promise_t *promise)
Checks if the specified promise can be satisfied by the caller and, if so, prevents others from satis...
Definition: future.c:251
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
const struct io_can_chan_vtbl *const io_can_chan_t
An abstract CAN channel.
Definition: can.h:59
struct io_can_chan_read * io_can_chan_read_from_task(struct ev_task *task)
Obtains a pointer to a CAN channel read operation from a pointer to its completion task.
Definition: can.c:93
static size_t io_can_chan_abort_read(io_can_chan_t *chan, struct io_can_chan_read *read)
Aborts the specified CAN channel read operation if it is pending.
Definition: can.h:500
static size_t io_can_chan_cancel_read(io_can_chan_t *chan, struct io_can_chan_read *read)
Cancels the specified CAN channel read operation if it is pending.
Definition: can.h:494
io_dev_t * io_can_chan_get_dev(const io_can_chan_t *chan)
Returns a pointer to the abstract I/O device representing the CAN channel.
Definition: can.h:469
void io_can_chan_submit_read(io_can_chan_t *chan, struct io_can_chan_read *read)
Submits a read operation to a CAN channel.
Definition: can.h:488
#define IO_CAN_CHAN_READ_INIT(msg, err, tp, exec, func)
The static initializer for io_can_chan_read.
Definition: can.h:104
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
#define CAN_ERR_INIT
The static initializer for a can_err struct.
Definition: err.h:43
const struct io_dev_vtbl *const io_dev_t
An abstract I/O device.
Definition: dev.h:35
io_ctx_t * io_dev_get_ctx(const io_dev_t *dev)
Returns a pointer to the I/O context with which the I/O device is registered.
Definition: dev.h:80
ev_exec_t * io_dev_get_exec(const io_dev_t *dev)
Returns a pointer to the executor used by the I/O device to execute asynchronous tasks.
Definition: dev.h:86
void rbnode_init(struct rbnode *node, const void *key)
Initializes a node in a red-black tree.
Definition: rbtree.h:237
void rbtree_insert(struct rbtree *tree, struct rbnode *node)
Inserts a node into a red-black tree.
Definition: rbtree.c:108
void rbtree_init(struct rbtree *tree, rbtree_cmp_t *cmp)
Initializes a red-black tree.
Definition: rbtree.h:248
void rbtree_remove(struct rbtree *tree, struct rbnode *node)
Removes a node from a red-black tree.
Definition: rbtree.c:187
int rbtree_empty(const struct rbtree *tree)
Returns 1 if the red-black tree is empty, and 0 if not.
Definition: rbtree.h:259
#define rbtree_foreach(tree, node)
Iterates over each node in a red-black tree in ascending order.
Definition: rbtree.h:234
struct rbnode * rbtree_find(const struct rbtree *tree, const void *key)
Finds a node in a red-black tree.
Definition: rbtree.c:306
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:194
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:257
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:232
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:202
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:243
This is the internal header file of the I/O library.
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
This header file is part of the event library; it contains the strand executor declarations.
void ev_strand_destroy(ev_exec_t *exec)
Destroys a strand executor.
Definition: strand.c:180
ev_exec_t * ev_strand_create(ev_exec_t *inner_exec)
Creates a strand executor.
Definition: strand.c:153
A CAN error frame.
Definition: err.h:28
A CAN or CAN FD format frame.
Definition: msg.h:87
uint_least32_t id
The identifier (11 or 29 bits, depending on the CAN_FLAG_IDE flag).
Definition: msg.h:89
uint_least8_t flags
The flags (any combination of CAN_FLAG_IDE, CAN_FLAG_RTR, CAN_FLAG_FDF, CAN_FLAG_BRS and CAN_FLAG_ESI...
Definition: msg.h:94
A future.
Definition: future.c:66
A promise.
Definition: future.c:95
An executable task.
Definition: task.h:41
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
int result
The result of the read operation: 1 if a CAN frame is received, 0 if an error frame is received,...
Definition: can.h:68
int errc
The error number, obtained as if by get_errc(), if result is -1.
Definition: can.h:70
A CAN channel read operation.
Definition: can.h:74
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition: can.h:98
struct io_can_chan_read_result r
The result of the read operation.
Definition: can.h:100
struct can_msg * msg
The address at which to store the CAN frame.
Definition: can.h:80
struct can_err * err
The address at which to store the CAN error frame.
Definition: can.h:86
const struct can_err * err
A pointer to the received CAN error frame, or NULL on error (or if the operation is canceled).
Definition: can_rt.h:94
int errc
The error number, obtained as if by get_errc(), if err is NULL.
Definition: can_rt.h:96
A CAN error frame read operation suitable for use with a CAN frame router.
Definition: can_rt.h:100
struct io_can_rt_read_err_result r
The result of the read operation.
Definition: can_rt.h:107
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition: can_rt.h:105
int errc
The error number, obtained as if by get_errc(), if msg is NULL.
Definition: can_rt.h:48
const struct can_msg * msg
A pointer to the received CAN frame, or NULL on error (or if the operation is canceled).
Definition: can_rt.h:46
A CAN frame read operation suitable for use with a CAN frame router.
Definition: can_rt.h:52
uint_least32_t id
The identifier of the CAN frame to be received.
Definition: can_rt.h:57
struct io_can_rt_read_msg_result r
The result of the read operation.
Definition: can_rt.h:71
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the read operation.
Definition: can_rt.h:69
uint_least8_t flags
The flags of the CAN frame to be received (any combination of CAN_FLAG_IDE, CAN_FLAG_RTR,...
Definition: can_rt.h:64
Definition: ctx.c:38
The virtual table of an I/O service.
Definition: ctx.h:67
An I/O service.
Definition: ctx.h:49
A node in a red-black tree.
Definition: rbtree.h:53
const void * key
A pointer to the key for this node.
Definition: rbtree.h:59
A red-black tree.
Definition: rbtree.h:91
A singly-linked list.
Definition: sllist.h:52
A node in a singly-linked list.
Definition: sllist.h:40
size_t ev_task_queue_abort(struct sllist *queue)
Aborts the tasks in queue by invoking ev_exec_on_task_fini() for each of them.
Definition: task.c:55
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109