Lely core libraries  2.3.4
fiber_exec.c
Go to the documentation of this file.
1 
24 #include "ev.h"
25 
26 #if !LELY_NO_MALLOC
27 
28 #if !LELY_NO_THREADS
29 #include <lely/libc/threads.h>
30 #endif
31 #include <lely/ev/exec.h>
32 #include <lely/ev/fiber_exec.h>
33 #include <lely/ev/task.h>
34 #include <lely/util/errnum.h>
35 #include <lely/util/util.h>
36 
37 #include <assert.h>
38 #include <stdint.h>
39 #include <stdlib.h>
40 
41 #ifndef LELY_EV_FIBER_MAX_UNUSED
43 #define LELY_EV_FIBER_MAX_UNUSED 16
44 #endif
45 
46 struct ev_fiber_ctx;
47 
52 #if LELY_NO_THREADS
53 static struct ev_fiber_thrd {
54 #else
56 #endif
61  size_t refcnt;
63  int flags;
65  size_t stack_size;
67  size_t max_unused;
71  size_t num_unused;
73  struct ev_fiber_ctx *curr;
77 
78 static void ev_fiber_exec_on_task_init(ev_exec_t *exec);
79 static void ev_fiber_exec_on_task_fini(ev_exec_t *exec);
80 static int ev_fiber_exec_dispatch(ev_exec_t *exec, struct ev_task *task);
81 static void ev_fiber_exec_post(ev_exec_t *exec, struct ev_task *task);
82 static void ev_fiber_exec_defer(ev_exec_t *exec, struct ev_task *task);
83 static size_t ev_fiber_exec_abort(ev_exec_t *exec, struct ev_task *task);
84 
85 // clang-format off
86 static const struct ev_exec_vtbl ev_fiber_exec_vtbl = {
87  &ev_fiber_exec_on_task_init,
88  &ev_fiber_exec_on_task_fini,
89  &ev_fiber_exec_dispatch,
90  &ev_fiber_exec_post,
91  &ev_fiber_exec_defer,
92  &ev_fiber_exec_abort,
93  NULL
94 };
95 // clang-format on
96 
98 struct ev_fiber_exec {
100  const struct ev_exec_vtbl *exec_vptr;
104  struct ev_task task;
108  size_t pending;
109 #if !LELY_NO_THREADS
112 #endif
114  int posted;
116  struct sllist queue;
117 };
118 
119 static void ev_fiber_exec_func(struct ev_task *task);
120 
121 static inline struct ev_fiber_exec *ev_fiber_exec_from_exec(
122  const ev_exec_t *exec);
123 
124 static int ev_fiber_exec_post_ctx(struct ev_fiber_exec *exec);
125 
130 struct ev_fiber_ctx {
138  struct ev_task task;
140  struct sllist queue;
141 };
142 
143 static struct ev_fiber_ctx *ev_fiber_ctx_create(struct ev_fiber_exec *exec);
144 static void ev_fiber_ctx_destroy(struct ev_fiber_ctx *ctx);
145 
146 static fiber_t *ev_fiber_ctx_fiber_func(fiber_t *fiber, void *arg);
147 static void ev_fiber_ctx_task_func(struct ev_task *task);
148 
149 static struct ev_fiber_ctx *ev_fiber_resume(fiber_t *fiber);
150 static fiber_t *ev_fiber_await_func(fiber_t *fiber, void *arg);
151 
152 static void ev_fiber_return(void);
153 static fiber_t *ev_fiber_return_func(fiber_t *fiber, void *arg);
154 
158  int type;
159 #if !LELY_NO_THREADS
162 #endif
164  struct ev_fiber_ctx *ctx;
166  size_t locked;
168  struct sllist queue;
169 };
170 
171 static fiber_t *ev_fiber_mtx_lock_func(fiber_t *fiber, void *arg);
172 
179  struct slnode node;
185  struct ev_task *task;
186 };
187 
188 static fiber_t *ev_fiber_cnd_wait_func(fiber_t *fiber, void *arg);
189 
192 #if !LELY_NO_THREADS
195 #endif
200  struct sllist queue;
201 };
202 
203 static void ev_fiber_cnd_wake(struct slnode *node);
204 
205 int
207 {
208  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
209 
210  if (thr->refcnt++)
211  return 1;
212 
213  int result = fiber_thrd_init(flags & ~FIBER_GUARD_STACK);
214  if (result == -1) {
215  thr->refcnt--;
216  } else {
217  thr->flags = flags;
218  thr->stack_size = stack_size;
219 
220  if (!(thr->max_unused = max_unused))
222  thr->unused = NULL;
223  thr->num_unused = 0;
224 
225  thr->curr = NULL;
226  thr->prev = NULL;
227  }
228  return result;
229 }
230 
231 void
233 {
234  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
235  assert(!thr->curr);
236 
237  if (--thr->refcnt)
238  return;
239 
240  struct ev_fiber_ctx *ctx;
241  while ((ctx = thr->unused)) {
242  thr->unused = ctx->next;
243  fiber_destroy(ctx->fiber);
244  }
245 
246  fiber_thrd_fini();
247 }
248 
249 void *
250 ev_fiber_exec_alloc(void)
251 {
252  struct ev_fiber_exec *exec = malloc(sizeof(*exec));
253  if (!exec) {
254 #if !LELY_NO_ERRNO
255  set_errc(errno2c(errno));
256 #endif
257  return NULL;
258  }
259  // Suppress a GCC maybe-uninitialized warning.
260  exec->exec_vptr = NULL;
261  // cppcheck-suppress memleak symbolName=exec
262  return &exec->exec_vptr;
263 }
264 
265 void
266 ev_fiber_exec_free(void *ptr)
267 {
268  if (ptr)
269  free(ev_fiber_exec_from_exec(ptr));
270 }
271 
272 ev_exec_t *
273 ev_fiber_exec_init(ev_exec_t *exec_, ev_exec_t *inner_exec)
274 {
275  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
276  assert(thr->refcnt);
277  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
278  assert(inner_exec);
279 
280  exec->exec_vptr = &ev_fiber_exec_vtbl;
281 
282  exec->inner_exec = inner_exec;
283  exec->task = (struct ev_task)EV_TASK_INIT(
284  exec->inner_exec, &ev_fiber_exec_func);
285 
286  exec->thr = thr;
287 
288  exec->pending = 0;
289 
290 #if !LELY_NO_THREADS
291  if (mtx_init(&exec->mtx, mtx_plain) != thrd_success)
292  return NULL;
293 #endif
294 
295  exec->posted = 0;
296 
297  sllist_init(&exec->queue);
298 
299  return exec_;
300 }
301 
302 void
303 ev_fiber_exec_fini(ev_exec_t *exec_)
304 {
305  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
306  assert(exec->thr == &ev_fiber_thrd);
307 
308  ev_fiber_exec_abort(exec_, NULL);
309 
310 #if !LELY_NO_THREADS
311  mtx_lock(&exec->mtx);
312 #endif
313  // Abort ev_fiber_exec_func().
314  if (exec->posted && ev_exec_abort(exec->task.exec, &exec->task))
315  exec->posted = 0;
316  assert(exec->posted == 0);
317 #if !LELY_NO_THREADS
318  mtx_unlock(&exec->mtx);
319 
320  mtx_destroy(&exec->mtx);
321 #endif
322 }
323 
324 ev_exec_t *
326 {
327  int errc = 0;
328 
329  ev_exec_t *exec = ev_fiber_exec_alloc();
330  if (!exec) {
331  errc = get_errc();
332  goto error_alloc;
333  }
334 
335  ev_exec_t *tmp = ev_fiber_exec_init(exec, inner_exec);
336  if (!tmp) {
337  errc = get_errc();
338  goto error_init;
339  }
340  exec = tmp;
341 
342  return exec;
343 
344 error_init:
345  ev_fiber_exec_free((void *)exec);
346 error_alloc:
347  set_errc(errc);
348  return NULL;
349 }
350 
351 void
353 {
354  if (exec) {
355  ev_fiber_exec_fini(exec);
356  ev_fiber_exec_free((void *)exec);
357  }
358 }
359 
360 ev_exec_t *
362 {
363  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
364 
365  return exec->inner_exec;
366 }
367 
368 void
370 {
371  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
372  assert(thr->curr);
373 
374  thr->prev = fiber_resume_with(thr->prev, ev_fiber_await_func, future);
375 }
376 
377 int
379 {
380  assert(mtx);
381 
382  if (type & ~ev_fiber_mtx_recursive) {
384  return ev_fiber_error;
385  }
386 
387  struct ev_fiber_mtx_impl *impl = malloc(sizeof(*impl));
388  if (!impl)
389  return ev_fiber_nomem;
390 
391  impl->type = type;
392 
393 #if !LELY_NO_THREADS
394  switch (mtx_init(&impl->mtx, mtx_plain)) {
395  case thrd_error: free(impl); return ev_fiber_error;
396  case thrd_nomem: free(impl); return ev_fiber_nomem;
397  default: break;
398  }
399 #endif
400 
401  impl->ctx = NULL;
402  impl->locked = 0;
403  sllist_init(&impl->queue);
404 
405  mtx->_impl = impl;
406 
407  return ev_fiber_success;
408 }
409 
410 void
412 {
413  assert(mtx);
414  struct ev_fiber_mtx_impl *impl = mtx->_impl;
415  assert(impl);
416 
417  assert(!impl->locked);
418  assert(sllist_empty(&impl->queue));
419 #if !LELY_NO_THREADS
420  mtx_destroy(&impl->mtx);
421 #endif
422  free(impl);
423 
424  mtx->_impl = NULL;
425 }
426 
427 int
429 {
430  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
431  assert(mtx);
432  struct ev_fiber_mtx_impl *impl = mtx->_impl;
433  assert(impl);
434 
435  if (!thr->curr) {
437  return ev_fiber_error;
438  }
439 
440 #if !LELY_NO_THREADS
441  mtx_lock(&impl->mtx);
442 #endif
443  if (impl->locked) {
444  if (impl->ctx == thr->curr) {
445  if (!(impl->type & ev_fiber_mtx_recursive)) {
446 #if !LELY_NO_THREADS
447  mtx_unlock(&impl->mtx);
448 #endif
450  return ev_fiber_error;
451  }
452  assert(impl->locked < SIZE_MAX);
453  impl->locked++;
454  } else {
455  // The inner mutex will be unlocked in
456  // ev_fiber_mtx_lock_func().
457  thr->prev = fiber_resume_with(thr->prev,
458  ev_fiber_mtx_lock_func, impl);
459  impl->ctx = thr->curr;
460  assert(impl->locked == 1);
461  }
462  } else {
463  assert(impl->ctx == NULL);
464  assert(sllist_empty(&impl->queue));
465  impl->ctx = thr->curr;
466  impl->locked = 1;
467 #if !LELY_NO_THREADS
468  mtx_unlock(&impl->mtx);
469 #endif
470  }
471 
472  return ev_fiber_success;
473 }
474 
475 int
477 {
478  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
479  assert(mtx);
480  struct ev_fiber_mtx_impl *impl = mtx->_impl;
481  assert(impl);
482 
483  if (!thr->curr) {
485  return ev_fiber_error;
486  }
487 
488 #if !LELY_NO_THREADS
489  mtx_lock(&impl->mtx);
490 #endif
491  if (impl->locked) {
492  if (impl->ctx == thr->curr) {
493  if (!(impl->type & ev_fiber_mtx_recursive)) {
494 #if !LELY_NO_THREADS
495  mtx_unlock(&impl->mtx);
496 #endif
498  return ev_fiber_error;
499  }
500  assert(impl->locked < SIZE_MAX);
501  impl->locked++;
502 #if !LELY_NO_THREADS
503  mtx_unlock(&impl->mtx);
504 #endif
505  return ev_fiber_success;
506  } else {
507 #if !LELY_NO_THREADS
508  mtx_unlock(&impl->mtx);
509 #endif
510  return ev_fiber_busy;
511  }
512  } else {
513  assert(impl->ctx == NULL);
514  assert(sllist_empty(&impl->queue));
515  impl->ctx = thr->curr;
516  impl->locked = 1;
517 #if !LELY_NO_THREADS
518  mtx_unlock(&impl->mtx);
519 #endif
520  return ev_fiber_success;
521  }
522 }
523 
524 int
526 {
527  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
528  assert(mtx);
529  struct ev_fiber_mtx_impl *impl = mtx->_impl;
530  assert(impl);
531 
532  if (!thr->curr) {
534  return ev_fiber_error;
535  }
536 
537  struct ev_task *task = NULL;
538 
539 #if !LELY_NO_THREADS
540  mtx_lock(&impl->mtx);
541 #endif
542  if (impl->ctx != thr->curr || !impl->locked) {
543 #if !LELY_NO_THREADS
544  mtx_unlock(&impl->mtx);
545 #endif
547  return ev_fiber_error;
548  }
549  assert((impl->type & ev_fiber_mtx_recursive) || impl->locked == 1);
550  if (!--impl->locked) {
551  impl->ctx = NULL;
552  if ((task = ev_task_from_node(sllist_pop_front(&impl->queue))))
553  impl->locked++;
554  }
555 #if !LELY_NO_THREADS
556  mtx_unlock(&impl->mtx);
557 #endif
558 
559  if (task)
560  ev_exec_post(task->exec, task);
561 
562  return ev_fiber_success;
563 }
564 
565 int
567 {
568  assert(cond);
569 
570  struct ev_fiber_cnd_impl *impl = malloc(sizeof(*impl));
571  if (!impl)
572  return ev_fiber_nomem;
573 
574 #if !LELY_NO_THREADS
575  switch (mtx_init(&impl->mtx, mtx_plain)) {
576  case thrd_error: free(impl); return ev_fiber_error;
577  case thrd_nomem: free(impl); return ev_fiber_nomem;
578  default: break;
579  }
580 #endif
581 
582  sllist_init(&impl->queue);
583 
584  cond->_impl = impl;
585 
586  return ev_fiber_success;
587 }
588 
589 void
591 {
592  assert(cond);
593  struct ev_fiber_cnd_impl *impl = cond->_impl;
594  assert(impl);
595 
596  assert(sllist_empty(&impl->queue));
597 #if !LELY_NO_THREADS
598  mtx_destroy(&impl->mtx);
599 #endif
600  free(impl);
601 
602  cond->_impl = NULL;
603 }
604 
605 int
607 {
608  assert(cond);
609  struct ev_fiber_cnd_impl *impl = cond->_impl;
610  assert(impl);
611 
612 #if !LELY_NO_THREADS
613  mtx_lock(&impl->mtx);
614 #endif
615  struct slnode *node = sllist_pop_front(&impl->queue);
616 #if !LELY_NO_THREADS
617  mtx_unlock(&impl->mtx);
618 #endif
619 
620  if (node)
621  ev_fiber_cnd_wake(node);
622 
623  return ev_fiber_success;
624 }
625 
626 int
628 {
629  assert(cond);
630  struct ev_fiber_cnd_impl *impl = cond->_impl;
631  assert(impl);
632 
633  struct sllist queue;
634  sllist_init(&queue);
635 
636 #if !LELY_NO_THREADS
637  mtx_lock(&impl->mtx);
638 #endif
639  sllist_append(&queue, &impl->queue);
640 #if !LELY_NO_THREADS
641  mtx_unlock(&impl->mtx);
642 #endif
643 
644  struct slnode *node;
645  while ((node = sllist_pop_front(&queue)))
646  ev_fiber_cnd_wake(node);
647 
648  return ev_fiber_success;
649 }
650 
651 int
653 {
654  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
655  assert(mtx);
656  struct ev_fiber_mtx_impl *impl = mtx->_impl;
657  assert(impl);
658 
659  if (!thr->curr) {
661  return ev_fiber_error;
662  }
663 
664  struct ev_fiber_cnd_wait wait = { .cond = cond, .mtx = mtx };
665 
666 #if !LELY_NO_THREADS
667  mtx_lock(&impl->mtx);
668 #endif
669  if (impl->ctx != thr->curr || impl->locked != 1) {
670 #if !LELY_NO_THREADS
671  mtx_unlock(&impl->mtx);
672 #endif
674  return ev_fiber_error;
675  }
676  assert(impl->locked);
677 
678  // The inner mutex will be unlocked in ev_fiber_cnd_wait_func().
679  thr->prev = fiber_resume_with(thr->prev, ev_fiber_cnd_wait_func, &wait);
680 
681 #if !LELY_NO_THREADS
682  mtx_lock(&impl->mtx);
683 #endif
684  assert(!impl->ctx);
685  impl->ctx = thr->curr;
686  assert(impl->locked == 1);
687 #if !LELY_NO_THREADS
688  mtx_unlock(&impl->mtx);
689 #endif
690 
691  return ev_fiber_success;
692 }
693 
694 static void
695 ev_fiber_exec_on_task_init(ev_exec_t *exec_)
696 {
697  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
698 
700 }
701 
702 static void
703 ev_fiber_exec_on_task_fini(ev_exec_t *exec_)
704 {
705  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
706 
708 }
709 
710 static int
711 ev_fiber_exec_dispatch(ev_exec_t *exec_, struct ev_task *task)
712 {
713  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
714  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
715  assert(task);
716  assert(!task->exec || task->exec == exec_);
717 
718  // Post the task if the executor is not currently executing a fiber.
719  struct ev_fiber_ctx *ctx = thr->curr;
720  if (!ctx || ctx->exec != exec) {
721  ev_fiber_exec_post(exec_, task);
722  return 0;
723  }
724 
725  if (!task->exec)
726  task->exec = exec_;
727 
728  // Execute the task immediately.
729  if (task->func)
730  task->func(task);
731 
732  return 1;
733 }
734 
735 static void
736 ev_fiber_exec_post(ev_exec_t *exec_, struct ev_task *task)
737 {
738  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
739  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
740  assert(task);
741  assert(!task->exec || task->exec == exec_);
742 
743  if (!task->exec)
744  task->exec = exec_;
745  ev_fiber_exec_on_task_init(exec_);
746 
747  // Append the task to the queue.
748 #if !LELY_NO_THREADS
749  mtx_lock(&exec->mtx);
750 #endif
751  sllist_push_back(&exec->queue, &task->_node);
752  int post = !exec->posted && exec->thr != thr;
753  if (post)
754  exec->posted = 1;
755 #if !LELY_NO_THREADS
756  mtx_unlock(&exec->mtx);
757 #endif
758 
759  // If no pending fibers are available (and the calling thread is the
760  // thread of this executor), try to post a new fiber immediately.
761  if (exec->thr == thr && !exec->pending)
762  ev_fiber_exec_post_ctx(exec);
763  // Otherwise, post a task to post a fiber.
764  if (post)
765  ev_exec_post(exec->task.exec, &exec->task);
766 }
767 
768 static void
769 ev_fiber_exec_defer(ev_exec_t *exec_, struct ev_task *task)
770 {
771  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
772  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
773  assert(task);
774  assert(!task->exec || task->exec == exec_);
775 
776  // Post the task if the executor is not currently executing a fiber.
777  struct ev_fiber_ctx *ctx = thr->curr;
778  if (!ctx || ctx->exec != exec) {
779  ev_fiber_exec_post(exec_, task);
780  return;
781  }
782 
783  if (!task->exec)
784  task->exec = exec_;
785  ev_fiber_exec_on_task_init(exec_);
786 
787  // Push the task to the deferred queue of the fiber.
788  sllist_push_back(&ctx->queue, &task->_node);
789 }
790 
791 static size_t
792 ev_fiber_exec_abort(ev_exec_t *exec_, struct ev_task *task)
793 {
794  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
795  struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
796 
797  struct sllist queue;
798  sllist_init(&queue);
799 
800  struct ev_fiber_ctx *ctx = thr->curr;
801  if (ctx && ctx->exec == exec) {
802  // Try to find and abort the task(s) in the deferred queue of
803  // the fiber.
804  if (!task)
805  sllist_append(&queue, &ctx->queue);
806  else if (sllist_remove(&ctx->queue, &task->_node))
807  sllist_push_back(&queue, &task->_node);
808  }
809 
810 #if !LELY_NO_THREADS
811  mtx_lock(&exec->mtx);
812 #endif
813  if (!task)
815  else if (sllist_remove(&exec->queue, &task->_node))
816  sllist_push_back(&queue, &task->_node);
817 #if !LELY_NO_THREADS
818  mtx_unlock(&exec->mtx);
819 #endif
820 
821  size_t n = 0;
822  while (sllist_pop_front(&queue)) {
823  ev_fiber_exec_on_task_fini(exec_);
824  n += n < SIZE_MAX;
825  }
826  return n;
827 }
828 
829 static void
830 ev_fiber_exec_func(struct ev_task *task)
831 {
832  assert(task);
833  struct ev_fiber_exec *exec = structof(task, struct ev_fiber_exec, task);
834  assert(exec->thr == &ev_fiber_thrd);
835 
836 #if !LELY_NO_THREADS
837  mtx_lock(&exec->mtx);
838 #endif
839  assert(exec->posted);
840  exec->posted = 0;
841 #if !LELY_NO_THREADS
842  mtx_unlock(&exec->mtx);
843 #endif
844 
845  // If no pending fibers are available, try to post a new fiber.
846  if (!exec->pending)
847  ev_fiber_exec_post_ctx(exec);
848 }
849 
850 static inline struct ev_fiber_exec *
851 ev_fiber_exec_from_exec(const ev_exec_t *exec)
852 {
853  assert(exec);
854 
855  return structof(exec, struct ev_fiber_exec, exec_vptr);
856 }
857 
858 static int
859 ev_fiber_exec_post_ctx(struct ev_fiber_exec *exec)
860 {
861  int errsv = get_errc();
862  struct ev_fiber_ctx *ctx = ev_fiber_ctx_create(exec);
863  if (!ctx) {
864  // Ignore the error; one of the existing fibers can pick up the
865  // submitted task.
866  set_errc(errsv);
867  return -1;
868  }
869 
870  exec->pending++;
871  ev_exec_post(ctx->task.exec, &ctx->task);
872 
873  return 0;
874 }
875 
876 static struct ev_fiber_ctx *
877 ev_fiber_ctx_create(struct ev_fiber_exec *exec)
878 {
879  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
880  assert(thr->refcnt);
881  assert(exec);
882  assert(exec->thr == thr);
883 
884  // Use a fiber from the unused list before creating a new one.
885  struct ev_fiber_ctx *ctx = thr->unused;
886  if (ctx) {
887  thr->unused = ctx->next;
888  ctx->next = NULL;
889  assert(thr->num_unused);
890  thr->num_unused--;
891  } else {
892  fiber_t *fiber = fiber_create(&ev_fiber_ctx_fiber_func, NULL,
893  thr->flags, sizeof(struct ev_fiber_ctx),
894  thr->stack_size);
895  if (!fiber)
896  return NULL;
897  ctx = fiber_data(fiber);
898  *ctx = (struct ev_fiber_ctx){
899  .fiber = fiber,
900  };
901  sllist_init(&ctx->queue);
902  }
903 
904  // Associate the fiber with the executor.
905  ctx->exec = exec;
906  ctx->task = (struct ev_task)EV_TASK_INIT(
907  exec->inner_exec, &ev_fiber_ctx_task_func);
908 
909  return ctx;
910 }
911 
912 static void
913 ev_fiber_ctx_destroy(struct ev_fiber_ctx *ctx)
914 {
915  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
916  assert(thr->refcnt);
917 
918  if (ctx) {
919  assert(sllist_empty(&ctx->queue));
920  if (thr->num_unused < thr->max_unused) {
921  ctx->next = thr->unused;
922  thr->unused = ctx;
923  thr->num_unused++;
924  } else {
925  fiber_destroy(ctx->fiber);
926  }
927  }
928 }
929 
930 static fiber_t *
931 ev_fiber_ctx_fiber_func(fiber_t *fiber, void *arg)
932 {
933  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
934  (void)arg;
935  struct ev_fiber_ctx *ctx = fiber_data(NULL);
936 
937  thr->prev = fiber;
938 
939  struct ev_task *task = NULL;
940  for (;;) {
941  if (!task) {
942  // Check if there are any tasks waiting on the queue of
943  // the executor.
944  assert(ctx->exec->pending);
945  ctx->exec->pending--;
946 #if !LELY_NO_THREADS
947  mtx_lock(&ctx->exec->mtx);
948 #endif
949  task = ev_task_from_node(
950  sllist_pop_front(&ctx->exec->queue));
951 #if !LELY_NO_THREADS
952  mtx_unlock(&ctx->exec->mtx);
953 #endif
954  // If no tasks are available, return and destroy the
955  // fiber or put it on the unused list.
956  if (!task) {
957  ev_fiber_return();
958  continue;
959  }
960  }
961 
962  // Execute the task.
963  ev_exec_t *exec = task->exec;
964  assert(exec == &ctx->exec->exec_vptr);
965  if (task->func)
966  task->func(task);
967  ev_fiber_exec_on_task_fini(exec);
968 
969  // Check if there are any deferred tasks remaining.
971  if (!task)
972  ctx->exec->pending++;
973 
974  ev_fiber_await(NULL);
975  }
976 
977  return NULL;
978 }
979 
980 static void
981 ev_fiber_ctx_task_func(struct ev_task *task)
982 {
983  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
984  assert(task);
985  struct ev_fiber_ctx *ctx = structof(task, struct ev_fiber_ctx, task);
986 
987  thr->curr = ctx;
988  fiber_t *fiber = fiber_resume(ctx->fiber);
989  assert(!fiber);
990  (void)fiber;
991 }
992 
993 static struct ev_fiber_ctx *
994 ev_fiber_resume(fiber_t *fiber)
995 {
996  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
997 
998  struct ev_fiber_ctx *ctx = thr->curr;
999  thr->curr = NULL;
1000  ctx->fiber = fiber;
1001 
1002  // If there are tasks on the queue, but no pending fibers to execute
1003  // them, try to post a new fiber.
1004  struct ev_fiber_exec *exec = ctx->exec;
1005  if (!exec->pending) {
1006 #if !LELY_NO_THREADS
1007  mtx_lock(&exec->mtx);
1008 #endif
1009  int empty = sllist_empty(&exec->queue);
1010 #if !LELY_NO_THREADS
1011  mtx_unlock(&exec->mtx);
1012 #endif
1013  if (!empty)
1014  ev_fiber_exec_post_ctx(exec);
1015  }
1016 
1017  return ctx;
1018 }
1019 
1020 static fiber_t *
1021 ev_fiber_await_func(fiber_t *fiber, void *arg)
1022 {
1023  ev_future_t *future = arg;
1024 
1025  struct ev_fiber_ctx *ctx = ev_fiber_resume(fiber);
1026 
1027  if (future)
1028  ev_future_submit(future, &ctx->task);
1029  else
1030  ev_exec_post(ctx->task.exec, &ctx->task);
1031 
1032  return NULL;
1033 }
1034 
1035 static void
1036 ev_fiber_return(void)
1037 {
1038  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
1039  assert(thr->curr);
1040 
1041  thr->prev = fiber_resume_with(thr->prev, ev_fiber_return_func, NULL);
1042 }
1043 
1044 static fiber_t *
1045 ev_fiber_return_func(fiber_t *fiber, void *arg)
1046 {
1047  struct ev_fiber_thrd *thr = &ev_fiber_thrd;
1048  (void)arg;
1049 
1050  struct ev_fiber_ctx *ctx = thr->curr;
1051  thr->curr = NULL;
1052  ctx->fiber = fiber;
1053 
1054  // Destroy the fiber or put it back on the unused list.
1055  ev_fiber_ctx_destroy(ctx);
1056 
1057  return NULL;
1058 }
1059 
1060 static fiber_t *
1061 ev_fiber_mtx_lock_func(fiber_t *fiber, void *arg)
1062 {
1063  struct ev_fiber_mtx_impl *impl = arg;
1064  assert(impl);
1065 
1066  struct ev_fiber_ctx *ctx = ev_fiber_resume(fiber);
1067 
1068  // Append the fiber to the queue of waiting fibers.
1069  sllist_push_back(&impl->queue, &ctx->task._node);
1070 #if !LELY_NO_THREADS
1071  // Unlock the inner mutex locked in ev_fiber_mtx_lock().
1072  mtx_unlock(&impl->mtx);
1073 #endif
1074 
1075  return NULL;
1076 }
1077 
1078 static fiber_t *
1079 ev_fiber_cnd_wait_func(fiber_t *fiber, void *arg)
1080 {
1081  struct ev_fiber_cnd_wait *wait = arg;
1082  assert(wait);
1083  ev_fiber_cnd_t *cond = wait->cond;
1084  assert(cond);
1085  struct ev_fiber_cnd_impl *cond_impl = cond->_impl;
1086  assert(cond_impl);
1087  ev_fiber_mtx_t *mtx = wait->mtx;
1088  struct ev_fiber_mtx_impl *mtx_impl = mtx->_impl;
1089  assert(mtx_impl);
1090 
1091  struct ev_fiber_ctx *ctx = ev_fiber_resume(fiber);
1092 
1093  wait->task = &ctx->task;
1094 #if !LELY_NO_THREADS
1095  mtx_lock(&cond_impl->mtx);
1096 #endif
1097  sllist_push_back(&cond_impl->queue, &wait->node);
1098 #if !LELY_NO_THREADS
1099  mtx_unlock(&cond_impl->mtx);
1100 #endif
1101 
1102  // Unlock the mutex for the duration of the wait.
1103  mtx_impl->ctx = NULL;
1104  assert(mtx_impl->locked == 1);
1105  mtx_impl->locked = 0;
1106  struct ev_task *task =
1108  if (task)
1109  mtx_impl->locked++;
1110 #if !LELY_NO_THREADS
1111  mtx_unlock(&mtx_impl->mtx);
1112 #endif
1113  // cppcheck-suppress duplicateCondition
1114  if (task)
1115  ev_exec_post(task->exec, task);
1116 
1117  return NULL;
1118 }
1119 
1120 static void
1121 ev_fiber_cnd_wake(struct slnode *node)
1122 {
1123  assert(node);
1124  struct ev_fiber_cnd_wait *wait =
1125  structof(node, struct ev_fiber_cnd_wait, node);
1126  ev_fiber_mtx_t *mtx = wait->mtx;
1127  assert(mtx);
1128  struct ev_fiber_mtx_impl *impl = mtx->_impl;
1129  assert(impl);
1130  struct ev_task *task = wait->task;
1131  assert(task);
1132 
1133 #if !LELY_NO_THREADS
1134  mtx_lock(&impl->mtx);
1135 #endif
1136  if (impl->locked) {
1137  sllist_push_back(&impl->queue, &task->_node);
1138  task = NULL;
1139  } else {
1140  assert(sllist_empty(&impl->queue));
1141  impl->locked = 1;
1142  }
1143 #if !LELY_NO_THREADS
1144  mtx_unlock(&impl->mtx);
1145 #endif
1146 
1147  if (task)
1148  ev_exec_post(task->exec, task);
1149 }
1150 
1151 #endif // !LELY_NO_MALLOC
This header file is part of the utilities library; it contains the native and platform-independent er...
@ ERRNUM_PERM
Operation not permitted.
Definition: errnum.h:208
@ ERRNUM_INVAL
Invalid argument.
Definition: errnum.h:132
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition: errnum.c:932
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:944
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:46
void set_errnum(errnum_t errnum)
Sets the current (thread-specific) platform-independent error number to errnum.
Definition: errnum.h:424
This header file is part of the event library; it contains the abstract task executor interface.
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:136
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:124
void ev_exec_on_task_fini(ev_exec_t *exec)
Undoes the effect of a previous call to ev_exec_on_task_init().
Definition: exec.h:112
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:106
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition: features.h:249
#define FIBER_GUARD_STACK
A flag specifying a fiber to add a guard page when allocating the stack frame so that the kernel gene...
Definition: fiber.h:85
void * fiber_data(const fiber_t *fiber)
Returns a pointer to the data region of the specified fiber, or of the calling fiber if fiber is NULL...
Definition: fiber.c:416
fiber_t * fiber_resume_with(fiber_t *fiber, fiber_func_t *func, void *arg)
Suspends the calling fiber and resumes the specified fiber, optionally executing a function before re...
Definition: fiber.c:430
int fiber_thrd_init(int flags)
Initializes the fiber associated with the calling thread.
Definition: fiber.c:210
fiber_t * fiber_resume(fiber_t *fiber)
Equivalent to fiber_resume_with(fiber, NULL, NULL).
Definition: fiber.c:424
void fiber_thrd_fini(void)
Finalizes the fiber associated with the calling thread.
Definition: fiber.c:242
void fiber_destroy(fiber_t *fiber)
Destroys the specified fiber.
Definition: fiber.c:395
fiber_t * fiber_create(fiber_func_t *func, void *arg, int flags, size_t data_size, size_t stack_size)
Creates a new fiber, allocates a stack and sets up a calling environment to begin executing the speci...
Definition: fiber.c:260
ev_exec_t * ev_fiber_exec_get_inner_exec(const ev_exec_t *exec_)
Returns a pointer to the inner executor of a fiber executor.
Definition: fiber_exec.c:361
int ev_fiber_mtx_lock(ev_fiber_mtx_t *mtx)
Suspends the currently running fiber until it locks the fiber mutex at mtx.
Definition: fiber_exec.c:428
void ev_fiber_thrd_fini(void)
Finalizes the calling thread and prevents further use by fiber executors.
Definition: fiber_exec.c:232
int ev_fiber_mtx_init(ev_fiber_mtx_t *mtx, int type)
Creates a fiber mutex object with properties indicated by type, which must have one of the four value...
Definition: fiber_exec.c:378
int ev_fiber_thrd_init(int flags, size_t stack_size, size_t max_unused)
Initializes the calling thread for use by fiber executors.
Definition: fiber_exec.c:206
int ev_fiber_mtx_unlock(ev_fiber_mtx_t *mtx)
Unlocks the fiber mutex at mtx.
Definition: fiber_exec.c:525
void ev_fiber_cnd_destroy(ev_fiber_cnd_t *cond)
Releases all resources used by the fiber condition variable at cond.
Definition: fiber_exec.c:590
void ev_fiber_mtx_destroy(ev_fiber_mtx_t *mtx)
Releases any resources used by the fiber mutex at mtx.
Definition: fiber_exec.c:411
int ev_fiber_cnd_signal(ev_fiber_cnd_t *cond)
Unblocks one of the fibers that are blocked on the fiber condition variable at cond at the time of th...
Definition: fiber_exec.c:606
int ev_fiber_cnd_broadcast(ev_fiber_cnd_t *cond)
Unblocks all of the fibers that are blocked on the fiber condition variable at cond at the time of th...
Definition: fiber_exec.c:627
int ev_fiber_cnd_init(ev_fiber_cnd_t *cond)
Creates a fiber condition variable.
Definition: fiber_exec.c:566
void ev_fiber_exec_destroy(ev_exec_t *exec)
Destroys a fiber executor.
Definition: fiber_exec.c:352
void ev_fiber_await(ev_future_t *future)
Suspends a currently running fiber until the specified future becomes ready (or is cancelled).
Definition: fiber_exec.c:369
#define LELY_EV_FIBER_MAX_UNUSED
The maximum number of unused fibers per thread.
Definition: fiber_exec.c:43
int ev_fiber_cnd_wait(ev_fiber_cnd_t *cond, ev_fiber_mtx_t *mtx)
Atomically unlocks the fiber mutex at mtx and endeavors to block until the fiber condition variable a...
Definition: fiber_exec.c:652
int ev_fiber_mtx_trylock(ev_fiber_mtx_t *mtx)
Endeavors to lock the fiber mutex at mtx.
Definition: fiber_exec.c:476
ev_exec_t * ev_fiber_exec_create(ev_exec_t *inner_exec)
Creates a fiber executor.
Definition: fiber_exec.c:325
This header file is part of the event library; it contains the fiber executor, mutex and condition va...
@ ev_fiber_nomem
Indicates that the requested operation failed because it was unable to allocate memory.
Definition: fiber_exec.h:62
@ ev_fiber_busy
Indicates that the requested operation failed because a resource requested by a test and return funct...
Definition: fiber_exec.h:57
@ ev_fiber_error
Indicates that the requested operation failed.
Definition: fiber_exec.h:47
@ ev_fiber_success
Indicates that the requested operation succeeded.
Definition: fiber_exec.h:45
@ ev_fiber_mtx_recursive
A fiber mutex type that supports recursive locking.
Definition: fiber_exec.h:74
void ev_future_submit(ev_future_t *future, struct ev_task *task)
Submits a task to be executed once the specified future is ready.
Definition: future.c:364
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:194
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:257
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:232
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:202
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:243
This is the internal header file of the event library.
This header file is part of the C11 and POSIX compatibility library; it includes <stdint....
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
The implementation of a fiber condition variable.
Definition: fiber_exec.c:191
struct sllist queue
The queue of fibers waiting for the condition variable to be signaled.
Definition: fiber_exec.c:200
mtx_t mtx
The mutex protecting queue.
Definition: fiber_exec.c:194
A synchronization primitive (similar to the standard C11 condition variable) that can be used to bloc...
Definition: fiber_exec.h:92
The context of a wait operation on a fiber condition variable.
Definition: fiber_exec.c:177
struct slnode node
The node in the queue of fibers waiting on cond.
Definition: fiber_exec.c:179
ev_fiber_cnd_t * cond
A pointer to the condition variable passed to ev_fiber_cnd_wait().
Definition: fiber_exec.c:181
ev_fiber_mtx_t * mtx
A pointer to the mutex passed to ev_fiber_cnd_wait().
Definition: fiber_exec.c:183
struct ev_task * task
A pointer to the task (in ev_fiber_ctx) used to wake up the fiber.
Definition: fiber_exec.c:185
The context of a fiber used for executing tasks.
Definition: fiber_exec.c:130
struct sllist queue
The queue of deferred tasks.
Definition: fiber_exec.c:140
fiber_t * fiber
A pointer to the fiber containing this context.
Definition: fiber_exec.c:132
struct ev_fiber_ctx * next
A pointer to the next fiber in the list of unused fibers.
Definition: fiber_exec.c:134
struct ev_task task
The task used to resume the fiber.
Definition: fiber_exec.c:138
struct ev_fiber_exec * exec
The executor using this fiber.
Definition: fiber_exec.c:136
The implementation of a fiber executor.
Definition: fiber_exec.c:98
struct ev_fiber_thrd * thr
A pointer to the ev_fiber_thrd instance for this executor.
Definition: fiber_exec.c:106
ev_exec_t * inner_exec
A pointer to the inner executor.
Definition: fiber_exec.c:102
struct sllist queue
The queue of tasks submitted to this executor.
Definition: fiber_exec.c:116
int posted
A flag indicating whether task has been posted to inner_exec.
Definition: fiber_exec.c:114
struct ev_task task
The task used to create new fibers.
Definition: fiber_exec.c:104
const struct ev_exec_vtbl * exec_vptr
A pointer to the virtual table for the executor interface.
Definition: fiber_exec.c:100
mtx_t mtx
The mutex protecting posted and queue.
Definition: fiber_exec.c:111
size_t pending
The number of pending fibers available to execute a task.
Definition: fiber_exec.c:108
The implementation of a fiber mutex.
Definition: fiber_exec.c:156
int type
The type of mutex: ev_fiber_mtx_plain or ev_fiber_mtx_recursive.
Definition: fiber_exec.c:158
struct sllist queue
The queue of fibers waiting to acquire the lock.
Definition: fiber_exec.c:168
struct ev_fiber_ctx * ctx
A pointer to the fiber holding the lock.
Definition: fiber_exec.c:164
size_t locked
The number of times the mutex has been recursively locked.
Definition: fiber_exec.c:166
mtx_t mtx
The mutex protecting locked and queue.
Definition: fiber_exec.c:161
A synchronization primitive (similar to the standard C11 mutex) that can be used to protect shared da...
Definition: fiber_exec.h:82
The parameters used for creating fibers on this thread and the list of unused fibers.
Definition: fiber_exec.c:55
size_t max_unused
The maximum number of unused fibers for this thread.
Definition: fiber_exec.c:67
size_t num_unused
The number of unused fibers.
Definition: fiber_exec.c:71
size_t stack_size
The size (in bytes) of the stack frame allocated for each fiber.
Definition: fiber_exec.c:65
struct ev_fiber_ctx * curr
A pointer to the currently running fiber.
Definition: fiber_exec.c:73
struct ev_fiber_ctx * unused
The list of unused fibers.
Definition: fiber_exec.c:69
fiber_t * prev
A pointer to the previously running (suspended) fiber.
Definition: fiber_exec.c:75
size_t refcnt
The number of invocations of ev_fiber_thrd_init() minus the the number of invocation of ev_fiber_thrd...
Definition: fiber_exec.c:61
int flags
The flags used when creating each fiber.
Definition: fiber_exec.c:63
A future.
Definition: future.c:66
An executable task.
Definition: task.h:41
ev_task_func_t * func
The function to be invoked when the task is run.
Definition: task.h:45
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
A fiber.
Definition: fiber.c:130
A singly-linked list.
Definition: sllist.h:52
A node in a singly-linked list.
Definition: sllist.h:40
This header file is part of the event library; it contains the task declarations.
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121
@ thrd_nomem
Indicates that the requested operation failed because it was unable to allocate memory.
Definition: threads.h:138
@ thrd_error
Indicates that the requested operation failed.
Definition: threads.h:123
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109