Lely core libraries  2.2.5
fiber_exec.c
Go to the documentation of this file.
1 
24 #include "ev.h"
25 #if !LELY_NO_THREADS
26 #include <lely/libc/threads.h>
27 #endif
28 #include <lely/ev/exec.h>
29 #include <lely/ev/fiber_exec.h>
30 #include <lely/ev/task.h>
31 #include <lely/util/errnum.h>
32 #include <lely/util/util.h>
33 
34 #include <assert.h>
35 #include <stdint.h>
36 #include <stdlib.h>
37 
38 #ifndef LELY_EV_FIBER_MAX_UNUSED
40 #define LELY_EV_FIBER_MAX_UNUSED 16
41 #endif
42 
43 struct ev_fiber_ctx;
44 
49 #if LELY_NO_THREADS
50 static struct ev_fiber_thrd {
51 #else
53 #endif
58  size_t refcnt;
60  int flags;
62  size_t stack_size;
64  size_t max_unused;
68  size_t num_unused;
70  struct ev_fiber_ctx *curr;
74 
75 static void ev_fiber_exec_on_task_init(ev_exec_t *exec);
76 static void ev_fiber_exec_on_task_fini(ev_exec_t *exec);
77 static int ev_fiber_exec_dispatch(ev_exec_t *exec, struct ev_task *task);
78 static void ev_fiber_exec_post(ev_exec_t *exec, struct ev_task *task);
79 static void ev_fiber_exec_defer(ev_exec_t *exec, struct ev_task *task);
80 static size_t ev_fiber_exec_abort(ev_exec_t *exec, struct ev_task *task);
81 
82 // clang-format off
83 static const struct ev_exec_vtbl ev_fiber_exec_vtbl = {
84  &ev_fiber_exec_on_task_init,
85  &ev_fiber_exec_on_task_fini,
86  &ev_fiber_exec_dispatch,
87  &ev_fiber_exec_post,
88  &ev_fiber_exec_defer,
89  &ev_fiber_exec_abort,
90  NULL
91 };
92 // clang-format on
93 
95 struct ev_fiber_exec {
97  const struct ev_exec_vtbl *exec_vptr;
101  struct ev_task task;
105  size_t pending;
106 #if !LELY_NO_THREADS
109 #endif
111  int posted;
113  struct sllist queue;
114 };
115 
116 static void ev_fiber_exec_func(struct ev_task *task);
117 
118 static inline struct ev_fiber_exec *ev_fiber_exec_from_exec(
119  const ev_exec_t *exec);
120 
121 static int ev_fiber_exec_post_ctx(struct ev_fiber_exec *exec);
122 
127 struct ev_fiber_ctx {
135  struct ev_task task;
137  struct sllist queue;
138 };
139 
140 static struct ev_fiber_ctx *ev_fiber_ctx_create(struct ev_fiber_exec *exec);
141 static void ev_fiber_ctx_destroy(struct ev_fiber_ctx *ctx);
142 
143 static fiber_t *ev_fiber_ctx_fiber_func(fiber_t *fiber, void *arg);
144 static void ev_fiber_ctx_task_func(struct ev_task *task);
145 
146 static struct ev_fiber_ctx *ev_fiber_resume(fiber_t *fiber);
147 static fiber_t *ev_fiber_await_func(fiber_t *fiber, void *arg);
148 
149 static void ev_fiber_return(void);
150 static fiber_t *ev_fiber_return_func(fiber_t *fiber, void *arg);
151 
155  int type;
156 #if !LELY_NO_THREADS
159 #endif
161  struct ev_fiber_ctx *ctx;
163  size_t locked;
165  struct sllist queue;
166 };
167 
168 static fiber_t *ev_fiber_mtx_lock_func(fiber_t *fiber, void *arg);
169 
176  struct slnode node;
182  struct ev_task *task;
183 };
184 
185 static fiber_t *ev_fiber_cnd_wait_func(fiber_t *fiber, void *arg);
186 
189 #if !LELY_NO_THREADS
192 #endif
197  struct sllist queue;
198 };
199 
200 static void ev_fiber_cnd_wake(struct slnode *node);
201 
202 int
204 {
205  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
206 
207  if (thr->refcnt++)
208  return 1;
209 
210  int result = fiber_thrd_init(flags & ~FIBER_GUARD_STACK);
211  if (result == -1) {
212  thr->refcnt--;
213  } else {
214  thr->flags = flags;
215  thr->stack_size = stack_size;
216 
217  if (!(thr->max_unused = max_unused))
219  thr->unused = NULL;
220  thr->num_unused = 0;
221 
222  thr->curr = NULL;
223  thr->prev = NULL;
224  }
225  return result;
226 }
227 
228 void
230 {
231  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
232  assert(!thr->curr);
233 
234  if (--thr->refcnt)
235  return;
236 
237  struct ev_fiber_ctx *ctx;
238  while ((ctx = thr->unused)) {
239  thr->unused = ctx->next;
240  fiber_destroy(ctx->fiber);
241  }
242 
243  fiber_thrd_fini();
244 }
245 
246 void *
247 ev_fiber_exec_alloc(void)
248 {
249  struct ev_fiber_exec *exec = malloc(sizeof(*exec));
250  if (!exec)
251  set_errc(errno2c(errno));
252  // cppcheck-suppress memleak symbolName=exec
253  return exec ? &exec->exec_vptr : NULL;
254 }
255 
256 void
257 ev_fiber_exec_free(void *ptr)
258 {
259  if (ptr)
260  free(ev_fiber_exec_from_exec(ptr));
261 }
262 
263 ev_exec_t *
264 ev_fiber_exec_init(ev_exec_t *exec_, ev_exec_t *inner_exec)
265 {
266  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
267  assert(thr->refcnt);
268  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
269  assert(inner_exec);
270 
271  exec->exec_vptr = &ev_fiber_exec_vtbl;
272 
273  exec->inner_exec = inner_exec;
274  exec->task = (struct ev_task)EV_TASK_INIT(
275  exec->inner_exec, &ev_fiber_exec_func);
276 
277  exec->thr = thr;
278 
279  exec->pending = 0;
280 
281 #if !LELY_NO_THREADS
282  if (mtx_init(&exec->mtx, mtx_plain) != thrd_success)
283  return NULL;
284 #endif
285 
286  exec->posted = 0;
287 
288  sllist_init(&exec->queue);
289 
290  return exec_;
291 }
292 
293 void
294 ev_fiber_exec_fini(ev_exec_t *exec_)
295 {
296  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
297  assert(exec->thr == &ev_fiber_thrd);
298 
299  ev_fiber_exec_abort(exec_, NULL);
300 
301 #if !LELY_NO_THREADS
302  mtx_lock(&exec->mtx);
303 #endif
304  // Abort ev_fiber_exec_func().
305  if (exec->posted && ev_exec_abort(exec->task.exec, &exec->task))
306  exec->posted = 0;
307  assert(exec->posted == 0);
308 #if !LELY_NO_THREADS
309  mtx_unlock(&exec->mtx);
310 
311  mtx_destroy(&exec->mtx);
312 #endif
313 }
314 
315 ev_exec_t *
317 {
318  int errc = 0;
319 
320  ev_exec_t *exec = ev_fiber_exec_alloc();
321  if (!exec) {
322  errc = get_errc();
323  goto error_alloc;
324  }
325 
326  ev_exec_t *tmp = ev_fiber_exec_init(exec, inner_exec);
327  if (!tmp) {
328  errc = get_errc();
329  goto error_init;
330  }
331  exec = tmp;
332 
333  return exec;
334 
335 error_init:
336  ev_fiber_exec_free((void *)exec);
337 error_alloc:
338  set_errc(errc);
339  return NULL;
340 }
341 
342 void
344 {
345  if (exec) {
346  ev_fiber_exec_fini(exec);
347  ev_fiber_exec_free((void *)exec);
348  }
349 }
350 
351 ev_exec_t *
353 {
354  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
355 
356  return exec->inner_exec;
357 }
358 
359 void
361 {
362  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
363  assert(thr->curr);
364 
365  thr->prev = fiber_resume_with(thr->prev, ev_fiber_await_func, future);
366 }
367 
368 int
370 {
371  assert(mtx);
372 
373  if (type & ~ev_fiber_mtx_recursive) {
375  return ev_fiber_error;
376  }
377 
378  struct ev_fiber_mtx_impl *impl = malloc(sizeof(*impl));
379  if (!impl)
380  return ev_fiber_nomem;
381 
382  impl->type = type;
383 
384 #if !LELY_NO_THREADS
385  switch (mtx_init(&impl->mtx, mtx_plain)) {
386  case thrd_error: free(impl); return ev_fiber_error;
387  case thrd_nomem: free(impl); return ev_fiber_nomem;
388  default: break;
389  }
390 #endif
391 
392  impl->ctx = NULL;
393  impl->locked = 0;
394  sllist_init(&impl->queue);
395 
396  mtx->_impl = impl;
397 
398  return ev_fiber_success;
399 }
400 
401 void
403 {
404  assert(mtx);
405  struct ev_fiber_mtx_impl *impl = mtx->_impl;
406  assert(impl);
407 
408  assert(!impl->locked);
409  assert(sllist_empty(&impl->queue));
410 #if !LELY_NO_THREADS
411  mtx_destroy(&impl->mtx);
412 #endif
413  free(impl);
414 
415  mtx->_impl = NULL;
416 }
417 
418 int
420 {
421  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
422  assert(mtx);
423  struct ev_fiber_mtx_impl *impl = mtx->_impl;
424  assert(impl);
425 
426  if (!thr->curr) {
428  return ev_fiber_error;
429  }
430 
431 #if !LELY_NO_THREADS
432  mtx_lock(&impl->mtx);
433 #endif
434  if (impl->locked) {
435  if (impl->ctx == thr->curr) {
436  if (!(impl->type & ev_fiber_mtx_recursive)) {
437 #if !LELY_NO_THREADS
438  mtx_unlock(&impl->mtx);
439 #endif
441  return ev_fiber_error;
442  }
443  assert(impl->locked < SIZE_MAX);
444  impl->locked++;
445  } else {
446  // The inner mutex will be unlocked in
447  // ev_fiber_mtx_lock_func().
448  thr->prev = fiber_resume_with(thr->prev,
449  ev_fiber_mtx_lock_func, impl);
450  impl->ctx = thr->curr;
451  assert(impl->locked == 1);
452  }
453  } else {
454  assert(impl->ctx == NULL);
455  assert(sllist_empty(&impl->queue));
456  impl->ctx = thr->curr;
457  impl->locked = 1;
458 #if !LELY_NO_THREADS
459  mtx_unlock(&impl->mtx);
460 #endif
461  }
462 
463  return ev_fiber_success;
464 }
465 
466 int
468 {
469  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
470  assert(mtx);
471  struct ev_fiber_mtx_impl *impl = mtx->_impl;
472  assert(impl);
473 
474  if (!thr->curr) {
476  return ev_fiber_error;
477  }
478 
479 #if !LELY_NO_THREADS
480  mtx_lock(&impl->mtx);
481 #endif
482  if (impl->locked) {
483  if (impl->ctx == thr->curr) {
484  if (!(impl->type & ev_fiber_mtx_recursive)) {
485 #if !LELY_NO_THREADS
486  mtx_unlock(&impl->mtx);
487 #endif
489  return ev_fiber_error;
490  }
491  assert(impl->locked < SIZE_MAX);
492  impl->locked++;
493 #if !LELY_NO_THREADS
494  mtx_unlock(&impl->mtx);
495 #endif
496  return ev_fiber_success;
497  } else {
498 #if !LELY_NO_THREADS
499  mtx_unlock(&impl->mtx);
500 #endif
501  return ev_fiber_busy;
502  }
503  } else {
504  assert(impl->ctx == NULL);
505  assert(sllist_empty(&impl->queue));
506  impl->ctx = thr->curr;
507  impl->locked = 1;
508 #if !LELY_NO_THREADS
509  mtx_unlock(&impl->mtx);
510 #endif
511  return ev_fiber_success;
512  }
513 }
514 
515 int
517 {
518  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
519  assert(mtx);
520  struct ev_fiber_mtx_impl *impl = mtx->_impl;
521  assert(impl);
522 
523  if (!thr->curr) {
525  return ev_fiber_error;
526  }
527 
528  struct ev_task *task = NULL;
529 
530 #if !LELY_NO_THREADS
531  mtx_lock(&impl->mtx);
532 #endif
533  if (impl->ctx != thr->curr || !impl->locked) {
534 #if !LELY_NO_THREADS
535  mtx_unlock(&impl->mtx);
536 #endif
538  return ev_fiber_error;
539  }
540  assert((impl->type & ev_fiber_mtx_recursive) || impl->locked == 1);
541  if (!--impl->locked) {
542  impl->ctx = NULL;
543  if ((task = ev_task_from_node(sllist_pop_front(&impl->queue))))
544  impl->locked++;
545  }
546 #if !LELY_NO_THREADS
547  mtx_unlock(&impl->mtx);
548 #endif
549 
550  if (task)
551  ev_exec_post(task->exec, task);
552 
553  return ev_fiber_success;
554 }
555 
556 int
558 {
559  assert(cond);
560 
561  struct ev_fiber_cnd_impl *impl = malloc(sizeof(*impl));
562  if (!impl)
563  return ev_fiber_nomem;
564 
565 #if !LELY_NO_THREADS
566  switch (mtx_init(&impl->mtx, mtx_plain)) {
567  case thrd_error: free(impl); return ev_fiber_error;
568  case thrd_nomem: free(impl); return ev_fiber_nomem;
569  default: break;
570  }
571 #endif
572 
573  sllist_init(&impl->queue);
574 
575  cond->_impl = impl;
576 
577  return ev_fiber_success;
578 }
579 
580 void
582 {
583  assert(cond);
584  struct ev_fiber_cnd_impl *impl = cond->_impl;
585  assert(impl);
586 
587  assert(sllist_empty(&impl->queue));
588 #if !LELY_NO_THREADS
589  mtx_destroy(&impl->mtx);
590 #endif
591  free(impl);
592 
593  cond->_impl = NULL;
594 }
595 
596 int
598 {
599  assert(cond);
600  struct ev_fiber_cnd_impl *impl = cond->_impl;
601  assert(impl);
602 
603 #if !LELY_NO_THREADS
604  mtx_lock(&impl->mtx);
605 #endif
606  struct slnode *node = sllist_pop_front(&impl->queue);
607 #if !LELY_NO_THREADS
608  mtx_unlock(&impl->mtx);
609 #endif
610 
611  if (node)
612  ev_fiber_cnd_wake(node);
613 
614  return ev_fiber_success;
615 }
616 
617 int
619 {
620  assert(cond);
621  struct ev_fiber_cnd_impl *impl = cond->_impl;
622  assert(impl);
623 
624  struct sllist queue;
625  sllist_init(&queue);
626 
627 #if !LELY_NO_THREADS
628  mtx_lock(&impl->mtx);
629 #endif
630  sllist_append(&queue, &impl->queue);
631 #if !LELY_NO_THREADS
632  mtx_unlock(&impl->mtx);
633 #endif
634 
635  struct slnode *node;
636  while ((node = sllist_pop_front(&queue)))
637  ev_fiber_cnd_wake(node);
638 
639  return ev_fiber_success;
640 }
641 
642 int
644 {
645  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
646  assert(mtx);
647  struct ev_fiber_mtx_impl *impl = mtx->_impl;
648  assert(impl);
649 
650  if (!thr->curr) {
652  return ev_fiber_error;
653  }
654 
655  struct ev_fiber_cnd_wait wait = { .cond = cond, .mtx = mtx };
656 
657 #if !LELY_NO_THREADS
658  mtx_lock(&impl->mtx);
659 #endif
660  if (impl->ctx != thr->curr || impl->locked != 1) {
661 #if !LELY_NO_THREADS
662  mtx_unlock(&impl->mtx);
663 #endif
665  return ev_fiber_error;
666  }
667  assert(impl->locked);
668 
669  // The inner mutex will be unlocked in ev_fiber_cnd_wait_func().
670  thr->prev = fiber_resume_with(thr->prev, ev_fiber_cnd_wait_func, &wait);
671 
672 #if !LELY_NO_THREADS
673  mtx_lock(&impl->mtx);
674 #endif
675  assert(!impl->ctx);
676  impl->ctx = thr->curr;
677  assert(impl->locked == 1);
678 #if !LELY_NO_THREADS
679  mtx_unlock(&impl->mtx);
680 #endif
681 
682  return ev_fiber_success;
683 }
684 
685 static void
686 ev_fiber_exec_on_task_init(ev_exec_t *exec_)
687 {
688  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
689 
691 }
692 
693 static void
694 ev_fiber_exec_on_task_fini(ev_exec_t *exec_)
695 {
696  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
697 
699 }
700 
701 static int
702 ev_fiber_exec_dispatch(ev_exec_t *exec_, struct ev_task *task)
703 {
704  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
705  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
706  assert(task);
707  assert(!task->exec || task->exec == exec_);
708 
709  // Post the task if the executor is not currently executing a fiber.
710  struct ev_fiber_ctx *ctx = thr->curr;
711  if (!ctx || ctx->exec != exec) {
712  ev_fiber_exec_post(exec_, task);
713  return 0;
714  }
715 
716  if (!task->exec)
717  task->exec = exec_;
718 
719  // Execute the task immediately.
720  if (task->func)
721  task->func(task);
722 
723  return 1;
724 }
725 
726 static void
727 ev_fiber_exec_post(ev_exec_t *exec_, struct ev_task *task)
728 {
729  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
730  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
731  assert(task);
732  assert(!task->exec || task->exec == exec_);
733 
734  if (!task->exec)
735  task->exec = exec_;
736  ev_fiber_exec_on_task_init(exec_);
737 
738  // Append the task to the queue.
739 #if !LELY_NO_THREADS
740  mtx_lock(&exec->mtx);
741 #endif
742  sllist_push_back(&exec->queue, &task->_node);
743  int post = !exec->posted && exec->thr != thr;
744  if (post)
745  exec->posted = 1;
746 #if !LELY_NO_THREADS
747  mtx_unlock(&exec->mtx);
748 #endif
749 
750  // If no pending fibers are available (and the calling thread is the
751  // thread of this executor), try to post a new fiber immediately.
752  if (exec->thr == thr && !exec->pending)
753  ev_fiber_exec_post_ctx(exec);
754  // Otherwise, post a task to post a fiber.
755  if (post)
756  ev_exec_post(exec->task.exec, &exec->task);
757 }
758 
759 static void
760 ev_fiber_exec_defer(ev_exec_t *exec_, struct ev_task *task)
761 {
762  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
763  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
764  assert(task);
765  assert(!task->exec || task->exec == exec_);
766 
767  // Post the task if the executor is not currently executing a fiber.
768  struct ev_fiber_ctx *ctx = thr->curr;
769  if (!ctx || ctx->exec != exec) {
770  ev_fiber_exec_post(exec_, task);
771  return;
772  }
773 
774  if (!task->exec)
775  task->exec = exec_;
776  ev_fiber_exec_on_task_init(exec_);
777 
778  // Push the task to the deferred queue of the fiber.
779  sllist_push_back(&ctx->queue, &task->_node);
780 }
781 
782 static size_t
783 ev_fiber_exec_abort(ev_exec_t *exec_, struct ev_task *task)
784 {
785  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
786  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
787 
788  struct sllist queue;
789  sllist_init(&queue);
790 
791  struct ev_fiber_ctx *ctx = thr->curr;
792  if (ctx && ctx->exec == exec) {
793  // Try to find and abort the task(s) in the deferred queue of
794  // the fiber.
795  if (!task)
796  sllist_append(&queue, &ctx->queue);
797  else if (sllist_remove(&ctx->queue, &task->_node))
798  sllist_push_back(&queue, &task->_node);
799  }
800 
801 #if !LELY_NO_THREADS
802  mtx_lock(&exec->mtx);
803 #endif
804  if (!task)
806  else if (sllist_remove(&exec->queue, &task->_node))
807  sllist_push_back(&queue, &task->_node);
808 #if !LELY_NO_THREADS
809  mtx_unlock(&exec->mtx);
810 #endif
811 
812  size_t n = 0;
813  while (sllist_pop_front(&queue)) {
814  ev_fiber_exec_on_task_fini(exec_);
815  n += n < SIZE_MAX;
816  }
817  return n;
818 }
819 
820 static void
821 ev_fiber_exec_func(struct ev_task *task)
822 {
823  assert(task);
824  struct ev_fiber_exec *exec = structof(task, struct ev_fiber_exec, task);
825  assert(exec->thr == &ev_fiber_thrd);
826 
827 #if !LELY_NO_THREADS
828  mtx_lock(&exec->mtx);
829 #endif
830  assert(exec->posted);
831  exec->posted = 0;
832 #if !LELY_NO_THREADS
833  mtx_unlock(&exec->mtx);
834 #endif
835 
836  // If no pending fibers are available, try to post a new fiber.
837  if (!exec->pending)
838  ev_fiber_exec_post_ctx(exec);
839 }
840 
841 static inline struct ev_fiber_exec *
842 ev_fiber_exec_from_exec(const ev_exec_t *exec)
843 {
844  assert(exec);
845 
846  return structof(exec, struct ev_fiber_exec, exec_vptr);
847 }
848 
849 static int
850 ev_fiber_exec_post_ctx(struct ev_fiber_exec *exec)
851 {
852  int errsv = get_errc();
853  struct ev_fiber_ctx *ctx = ev_fiber_ctx_create(exec);
854  if (!ctx) {
855  // Ignore the error; one of the existing fibers can pick up the
856  // submitted task.
857  set_errc(errsv);
858  return -1;
859  }
860 
861  exec->pending++;
862  ev_exec_post(ctx->task.exec, &ctx->task);
863 
864  return 0;
865 }
866 
867 static struct ev_fiber_ctx *
868 ev_fiber_ctx_create(struct ev_fiber_exec *exec)
869 {
870  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
871  assert(thr->refcnt);
872  assert(exec);
873  assert(exec->thr == thr);
874 
875  // Use a fiber from the unused list before creating a new one.
876  struct ev_fiber_ctx *ctx = thr->unused;
877  if (ctx) {
878  thr->unused = ctx->next;
879  ctx->next = NULL;
880  assert(thr->num_unused);
881  thr->num_unused--;
882  } else {
883  fiber_t *fiber = fiber_create(&ev_fiber_ctx_fiber_func, NULL,
884  thr->flags, sizeof(struct ev_fiber_ctx),
885  thr->stack_size);
886  if (!fiber)
887  return NULL;
888  ctx = fiber_data(fiber);
889  *ctx = (struct ev_fiber_ctx){
890  .fiber = fiber,
891  };
892  sllist_init(&ctx->queue);
893  }
894 
895  // Associate the fiber with the executor.
896  ctx->exec = exec;
897  ctx->task = (struct ev_task)EV_TASK_INIT(
898  exec->inner_exec, &ev_fiber_ctx_task_func);
899 
900  return ctx;
901 }
902 
903 static void
904 ev_fiber_ctx_destroy(struct ev_fiber_ctx *ctx)
905 {
906  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
907  assert(thr->refcnt);
908 
909  if (ctx) {
910  assert(sllist_empty(&ctx->queue));
911  if (thr->num_unused < thr->max_unused) {
912  ctx->next = thr->unused;
913  thr->unused = ctx;
914  thr->num_unused++;
915  } else {
916  fiber_destroy(ctx->fiber);
917  }
918  }
919 }
920 
921 static fiber_t *
922 ev_fiber_ctx_fiber_func(fiber_t *fiber, void *arg)
923 {
924  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
925  (void)arg;
926  struct ev_fiber_ctx *ctx = fiber_data(NULL);
927 
928  thr->prev = fiber;
929 
930  struct ev_task *task = NULL;
931  for (;;) {
932  if (!task) {
933  // Check if there are any tasks waiting on the queue of
934  // the executor.
935  assert(ctx->exec->pending);
936  ctx->exec->pending--;
937 #if !LELY_NO_THREADS
938  mtx_lock(&ctx->exec->mtx);
939 #endif
940  task = ev_task_from_node(
941  sllist_pop_front(&ctx->exec->queue));
942 #if !LELY_NO_THREADS
943  mtx_unlock(&ctx->exec->mtx);
944 #endif
945  // If no tasks are available, return and destroy the
946  // fiber or put it on the unused list.
947  if (!task) {
948  ev_fiber_return();
949  continue;
950  }
951  }
952 
953  // Execute the task.
954  ev_exec_t *exec = task->exec;
955  assert(exec == &ctx->exec->exec_vptr);
956  if (task->func)
957  task->func(task);
958  ev_fiber_exec_on_task_fini(exec);
959 
960  // Check if there are any deferred tasks remaining.
962  if (!task)
963  ctx->exec->pending++;
964 
965  ev_fiber_await(NULL);
966  }
967 
968  return NULL;
969 }
970 
971 static void
972 ev_fiber_ctx_task_func(struct ev_task *task)
973 {
974  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
975  assert(task);
976  struct ev_fiber_ctx *ctx = structof(task, struct ev_fiber_ctx, task);
977 
978  thr->curr = ctx;
979  fiber_t *fiber = fiber_resume(ctx->fiber);
980  assert(!fiber);
981  (void)fiber;
982 }
983 
984 static struct ev_fiber_ctx *
985 ev_fiber_resume(fiber_t *fiber)
986 {
987  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
988 
989  struct ev_fiber_ctx *ctx = thr->curr;
990  thr->curr = NULL;
991  ctx->fiber = fiber;
992 
993  // If there are tasks on the queue, but no pending fibers to execute
994  // them, try to post a new fiber.
995  struct ev_fiber_exec *exec = ctx->exec;
996  if (!exec->pending) {
997 #if !LELY_NO_THREADS
998  mtx_lock(&exec->mtx);
999 #endif
1000  int empty = sllist_empty(&exec->queue);
1001 #if !LELY_NO_THREADS
1002  mtx_unlock(&exec->mtx);
1003 #endif
1004  if (!empty)
1005  ev_fiber_exec_post_ctx(exec);
1006  }
1007 
1008  return ctx;
1009 }
1010 
1011 static fiber_t *
1012 ev_fiber_await_func(fiber_t *fiber, void *arg)
1013 {
1014  ev_future_t *future = arg;
1015 
1016  struct ev_fiber_ctx *ctx = ev_fiber_resume(fiber);
1017 
1018  if (future)
1019  ev_future_submit(future, &ctx->task);
1020  else
1021  ev_exec_post(ctx->task.exec, &ctx->task);
1022 
1023  return NULL;
1024 }
1025 
1026 static void
1027 ev_fiber_return(void)
1028 {
1029  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
1030  assert(thr->curr);
1031 
1032  thr->prev = fiber_resume_with(thr->prev, ev_fiber_return_func, NULL);
1033 }
1034 
1035 static fiber_t *
1036 ev_fiber_return_func(fiber_t *fiber, void *arg)
1037 {
1038  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
1039  (void)arg;
1040 
1041  struct ev_fiber_ctx *ctx = thr->curr;
1042  thr->curr = NULL;
1043  ctx->fiber = fiber;
1044 
1045  // Destroy the fiber or put it back on the unused list.
1046  ev_fiber_ctx_destroy(ctx);
1047 
1048  return NULL;
1049 }
1050 
1051 static fiber_t *
1052 ev_fiber_mtx_lock_func(fiber_t *fiber, void *arg)
1053 {
1054  struct ev_fiber_mtx_impl *impl = arg;
1055  assert(impl);
1056 
1057  struct ev_fiber_ctx *ctx = ev_fiber_resume(fiber);
1058 
1059  // Append the fiber to the queue of waiting fibers.
1060  sllist_push_back(&impl->queue, &ctx->task._node);
1061 #if !LELY_NO_THREADS
1062  // Unlock the inner mutex locked in ev_fiber_mtx_lock().
1063  mtx_unlock(&impl->mtx);
1064 #endif
1065 
1066  return NULL;
1067 }
1068 
1069 static fiber_t *
1070 ev_fiber_cnd_wait_func(fiber_t *fiber, void *arg)
1071 {
1072  struct ev_fiber_cnd_wait *wait = arg;
1073  assert(wait);
1074  ev_fiber_cnd_t *cond = wait->cond;
1075  assert(cond);
1076  struct ev_fiber_cnd_impl *cond_impl = cond->_impl;
1077  assert(cond_impl);
1078  ev_fiber_mtx_t *mtx = wait->mtx;
1079  struct ev_fiber_mtx_impl *mtx_impl = mtx->_impl;
1080  assert(mtx_impl);
1081 
1082  struct ev_fiber_ctx *ctx = ev_fiber_resume(fiber);
1083 
1084  wait->task = &ctx->task;
1085 #if !LELY_NO_THREADS
1086  mtx_lock(&cond_impl->mtx);
1087 #endif
1088  sllist_push_back(&cond_impl->queue, &wait->node);
1089 #if !LELY_NO_THREADS
1090  mtx_unlock(&cond_impl->mtx);
1091 #endif
1092 
1093  // Unlock the mutex for the duration of the wait.
1094  mtx_impl->ctx = NULL;
1095  assert(mtx_impl->locked == 1);
1096  mtx_impl->locked = 0;
1097  struct ev_task *task =
1099  if (task)
1100  mtx_impl->locked++;
1101 #if !LELY_NO_THREADS
1102  mtx_unlock(&mtx_impl->mtx);
1103 #endif
1104  // cppcheck-suppress duplicateCondition
1105  if (task)
1106  ev_exec_post(task->exec, task);
1107 
1108  return NULL;
1109 }
1110 
1111 static void
1112 ev_fiber_cnd_wake(struct slnode *node)
1113 {
1114  assert(node);
1115  struct ev_fiber_cnd_wait *wait =
1116  structof(node, struct ev_fiber_cnd_wait, node);
1117  ev_fiber_mtx_t *mtx = wait->mtx;
1118  assert(mtx);
1119  struct ev_fiber_mtx_impl *impl = mtx->_impl;
1120  assert(impl);
1121  struct ev_task *task = wait->task;
1122  assert(task);
1123 
1124 #if !LELY_NO_THREADS
1125  mtx_lock(&impl->mtx);
1126 #endif
1127  if (impl->locked) {
1128  sllist_push_back(&impl->queue, &task->_node);
1129  task = NULL;
1130  } else {
1131  assert(sllist_empty(&impl->queue));
1132  impl->locked = 1;
1133  }
1134 #if !LELY_NO_THREADS
1135  mtx_unlock(&impl->mtx);
1136 #endif
1137 
1138  if (task)
1139  ev_exec_post(task->exec, task);
1140 }
This header file is part of the utilities library; it contains the native and platform-independent er...
@ ERRNUM_PERM
Operation not permitted.
Definition: errnum.h:205
@ ERRNUM_INVAL
Invalid argument.
Definition: errnum.h:129
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition: errnum.c:947
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:957
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:43
void set_errnum(errnum_t errnum)
Sets the current (thread-specific) platform-independent error number to errnum.
Definition: errnum.h:375
This header file is part of the event library; it contains the abstract task executor interface.
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:138
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:126
void ev_exec_on_task_fini(ev_exec_t *exec)
Undoes the effect of a previous call to ev_exec_on_task_init().
Definition: exec.h:114
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:108
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition: features.h:239
ev_exec_t * ev_fiber_exec_get_inner_exec(const ev_exec_t *exec_)
Returns a pointer to the inner executor of a fiber executor.
Definition: fiber_exec.c:352
int ev_fiber_mtx_lock(ev_fiber_mtx_t *mtx)
Suspends the currently running fiber until it locks the fiber mutex at mtx.
Definition: fiber_exec.c:419
void ev_fiber_thrd_fini(void)
Finalizes the calling thread and prevents further use by fiber executors.
Definition: fiber_exec.c:229
int ev_fiber_mtx_init(ev_fiber_mtx_t *mtx, int type)
Creates a fiber mutex object with properties indicated by type, which must have one of the four value...
Definition: fiber_exec.c:369
int ev_fiber_thrd_init(int flags, size_t stack_size, size_t max_unused)
Initializes the calling thread for use by fiber executors.
Definition: fiber_exec.c:203
int ev_fiber_mtx_unlock(ev_fiber_mtx_t *mtx)
Unlocks the fiber mutex at mtx.
Definition: fiber_exec.c:516
void ev_fiber_cnd_destroy(ev_fiber_cnd_t *cond)
Releases all resources used by the fiber condition variable at cond.
Definition: fiber_exec.c:581
void ev_fiber_mtx_destroy(ev_fiber_mtx_t *mtx)
Releases any resources used by the fiber mutex at mtx.
Definition: fiber_exec.c:402
int ev_fiber_cnd_signal(ev_fiber_cnd_t *cond)
Unblocks one of the fibers that are blocked on the fiber condition variable at cond at the time of th...
Definition: fiber_exec.c:597
int ev_fiber_cnd_broadcast(ev_fiber_cnd_t *cond)
Unblocks all of the fibers that are blocked on the fiber condition variable at cond at the time of th...
Definition: fiber_exec.c:618
int ev_fiber_cnd_init(ev_fiber_cnd_t *cond)
Creates a fiber condition variable.
Definition: fiber_exec.c:557
void ev_fiber_exec_destroy(ev_exec_t *exec)
Destroys a fiber executor.
Definition: fiber_exec.c:343
void ev_fiber_await(ev_future_t *future)
Suspends a currently running fiber until the specified future becomes ready (or is cancelled).
Definition: fiber_exec.c:360
#define LELY_EV_FIBER_MAX_UNUSED
The maximum number of unused fibers per thread.
Definition: fiber_exec.c:40
int ev_fiber_cnd_wait(ev_fiber_cnd_t *cond, ev_fiber_mtx_t *mtx)
Atomically unlocks the fiber mutex at mtx and endeavors to block until the fiber condition variable a...
Definition: fiber_exec.c:643
int ev_fiber_mtx_trylock(ev_fiber_mtx_t *mtx)
Endeavors to lock the fiber mutex at mtx.
Definition: fiber_exec.c:467
ev_exec_t * ev_fiber_exec_create(ev_exec_t *inner_exec)
Creates a fiber executor.
Definition: fiber_exec.c:316
This header file is part of the event library; it contains the fiber executor, mutex and condition va...
@ ev_fiber_nomem
Indicates that the requested operation failed because it was unable to allocate memory.
Definition: fiber_exec.h:62
@ ev_fiber_busy
Indicates that the requested operation failed because a resource requested by a test and return funct...
Definition: fiber_exec.h:57
@ ev_fiber_error
Indicates that the requested operation failed.
Definition: fiber_exec.h:47
@ ev_fiber_success
Indicates that the requested operation succeeded.
Definition: fiber_exec.h:45
@ ev_fiber_mtx_recursive
A fiber mutex type that supports recursive locking.
Definition: fiber_exec.h:74
void ev_future_submit(ev_future_t *future, struct ev_task *task)
Submits a task to be executed once the specified future is ready.
Definition: future.c:360
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
#define FIBER_GUARD_STACK
A flag specifying a fiber to add a guard page when allocating the stack frame so that the kernel gene...
Definition: fiber.h:77
void * fiber_data(const fiber_t *fiber)
Returns a pointer to the data region of the specified fiber, or of the calling fiber if fiber is NULL...
Definition: fiber-sjlj.c:314
fiber_t * fiber_resume_with(fiber_t *fiber, fiber_func_t *func, void *arg)
Suspends the calling fiber and resumes the specified fiber, optionally executing a function before re...
Definition: fiber-sjlj.c:334
int fiber_thrd_init(int flags)
Initializes the fiber associated with the calling thread.
Definition: fiber-sjlj.c:108
fiber_t * fiber_resume(fiber_t *fiber)
Equivalent to fiber_resume_with(fiber, NULL, NULL).
Definition: fiber-sjlj.c:322
void fiber_thrd_fini(void)
Finalizes the fiber associated with the calling thread.
Definition: fiber-sjlj.c:135
void fiber_destroy(fiber_t *fiber)
Destroys the specified fiber.
Definition: fiber-sjlj.c:299
fiber_t * fiber_create(fiber_func_t *func, void *arg, int flags, size_t data_size, size_t stack_size)
Creates a new fiber, allocates a stack and sets up a calling environment to begin executing the speci...
Definition: fiber-sjlj.c:152
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:184
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:233
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:213
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:190
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:221
This is the internal header file of the event library.
This header file is part of the C11 and POSIX compatibility library; it includes <stdint....
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
The implementation of a fiber condition variable.
Definition: fiber_exec.c:188
struct sllist queue
The queue of fibers waiting for the condition variable to be signaled.
Definition: fiber_exec.c:197
mtx_t mtx
The mutex protecting queue.
Definition: fiber_exec.c:191
A synchronization primitive (similar to the standard C11 condition variable) that can be used to bloc...
Definition: fiber_exec.h:92
The context of a wait operation on a fiber condition variable.
Definition: fiber_exec.c:174
struct slnode node
The node in the queue of fibers waiting on cond.
Definition: fiber_exec.c:176
ev_fiber_cnd_t * cond
A pointer to the condition variable passed to ev_fiber_cnd_wait().
Definition: fiber_exec.c:178
ev_fiber_mtx_t * mtx
A pointer to the mutex passed to ev_fiber_cnd_wait().
Definition: fiber_exec.c:180
struct ev_task * task
A pointer to the task (in ev_fiber_ctx) used to wake up the fiber.
Definition: fiber_exec.c:182
The context of a fiber used for executing tasks.
Definition: fiber_exec.c:127
struct sllist queue
The queue of deferred tasks.
Definition: fiber_exec.c:137
fiber_t * fiber
A pointer to the fiber containing this context.
Definition: fiber_exec.c:129
struct ev_fiber_ctx * next
A pointer to the next fiber in the list of unused fibers.
Definition: fiber_exec.c:131
struct ev_task task
The task used to resume the fiber.
Definition: fiber_exec.c:135
struct ev_fiber_exec * exec
The executor using this fiber.
Definition: fiber_exec.c:133
The implementation of a fiber executor.
Definition: fiber_exec.c:95
struct ev_fiber_thrd * thr
A pointer to the ev_fiber_thrd instance for this executor.
Definition: fiber_exec.c:103
ev_exec_t * inner_exec
A pointer to the inner executor.
Definition: fiber_exec.c:99
struct sllist queue
The queue of tasks submitted to this executor.
Definition: fiber_exec.c:113
int posted
A flag indicating whether task has been posted to inner_exec.
Definition: fiber_exec.c:111
struct ev_task task
The task used to create new fibers.
Definition: fiber_exec.c:101
const struct ev_exec_vtbl * exec_vptr
A pointer to the virtual table for the executor interface.
Definition: fiber_exec.c:97
mtx_t mtx
The mutex protecting posted and queue.
Definition: fiber_exec.c:108
size_t pending
The number of pending fibers available to execute a task.
Definition: fiber_exec.c:105
The implementation of a fiber mutex.
Definition: fiber_exec.c:153
int type
The type of mutex: ev_fiber_mtx_plain or ev_fiber_mtx_recursive.
Definition: fiber_exec.c:155
struct sllist queue
The queue of fibers waiting to acquire the lock.
Definition: fiber_exec.c:165
struct ev_fiber_ctx * ctx
A pointer to the fiber holding the lock.
Definition: fiber_exec.c:161
size_t locked
The number of times the mutex has been recursively locked.
Definition: fiber_exec.c:163
mtx_t mtx
The mutex protecting locked and queue.
Definition: fiber_exec.c:158
A synchronization primitive (similar to the standard C11 mutex) that can be used to protect shared da...
Definition: fiber_exec.h:82
The parameters used for creating fibers on this thread and the list of unused fibers.
Definition: fiber_exec.c:52
size_t max_unused
The maximum number of unused fibers for this thread.
Definition: fiber_exec.c:64
size_t num_unused
The number of unused fibers.
Definition: fiber_exec.c:68
size_t stack_size
The size (in bytes) of the stack frame allocated for each fiber.
Definition: fiber_exec.c:62
struct ev_fiber_ctx * curr
A pointer to the currently running fiber.
Definition: fiber_exec.c:70
struct ev_fiber_ctx * unused
The list of unused fibers.
Definition: fiber_exec.c:66
fiber_t * prev
A pointer to the previously running (suspended) fiber.
Definition: fiber_exec.c:72
size_t refcnt
The number of invocations of ev_fiber_thrd_init() minus the the number of invocation of ev_fiber_thrd...
Definition: fiber_exec.c:58
int flags
The flags used when creating each fiber.
Definition: fiber_exec.c:60
A future.
Definition: future.c:63
An executable task.
Definition: task.h:41
ev_task_func_t * func
The function to be invoked when the task is run.
Definition: task.h:45
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
A singly-linked list.
Definition: sllist.h:51
A node in a singly-linked list.
Definition: sllist.h:39
This header file is part of the event library; it contains the task declarations.
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121
@ thrd_nomem
Indicates that the requested operation failed because it was unable to allocate memory.
Definition: threads.h:138
@ thrd_error
Indicates that the requested operation failed.
Definition: threads.h:123
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109