Lely core libraries 2.3.4
fiber_exec.c
Go to the documentation of this file.
1
24#include "ev.h"
25
26#if !LELY_NO_MALLOC
27
28#if !LELY_NO_THREADS
29#include <lely/libc/threads.h>
30#endif
31#include <lely/ev/exec.h>
32#include <lely/ev/fiber_exec.h>
33#include <lely/ev/task.h>
34#include <lely/util/errnum.h>
35#include <lely/util/util.h>
36
37#include <assert.h>
38#include <stdint.h>
39#include <stdlib.h>
40
41#ifndef LELY_EV_FIBER_MAX_UNUSED
43#define LELY_EV_FIBER_MAX_UNUSED 16
44#endif
45
46struct ev_fiber_ctx;
47
52#if LELY_NO_THREADS
53static struct ev_fiber_thrd {
54#else
56#endif
61 size_t refcnt;
63 int flags;
65 size_t stack_size;
67 size_t max_unused;
71 size_t num_unused;
77
78static void ev_fiber_exec_on_task_init(ev_exec_t *exec);
79static void ev_fiber_exec_on_task_fini(ev_exec_t *exec);
80static int ev_fiber_exec_dispatch(ev_exec_t *exec, struct ev_task *task);
81static void ev_fiber_exec_post(ev_exec_t *exec, struct ev_task *task);
82static void ev_fiber_exec_defer(ev_exec_t *exec, struct ev_task *task);
83static size_t ev_fiber_exec_abort(ev_exec_t *exec, struct ev_task *task);
84
85// clang-format off
86static const struct ev_exec_vtbl ev_fiber_exec_vtbl = {
87 &ev_fiber_exec_on_task_init,
88 &ev_fiber_exec_on_task_fini,
89 &ev_fiber_exec_dispatch,
90 &ev_fiber_exec_post,
91 &ev_fiber_exec_defer,
92 &ev_fiber_exec_abort,
93 NULL
94};
95// clang-format on
96
100 const struct ev_exec_vtbl *exec_vptr;
104 struct ev_task task;
108 size_t pending;
109#if !LELY_NO_THREADS
112#endif
116 struct sllist queue;
117};
118
119static void ev_fiber_exec_func(struct ev_task *task);
120
121static inline struct ev_fiber_exec *ev_fiber_exec_from_exec(
122 const ev_exec_t *exec);
123
124static int ev_fiber_exec_post_ctx(struct ev_fiber_exec *exec);
125
138 struct ev_task task;
140 struct sllist queue;
141};
142
143static struct ev_fiber_ctx *ev_fiber_ctx_create(struct ev_fiber_exec *exec);
144static void ev_fiber_ctx_destroy(struct ev_fiber_ctx *ctx);
145
146static fiber_t *ev_fiber_ctx_fiber_func(fiber_t *fiber, void *arg);
147static void ev_fiber_ctx_task_func(struct ev_task *task);
148
149static struct ev_fiber_ctx *ev_fiber_resume(fiber_t *fiber);
150static fiber_t *ev_fiber_await_func(fiber_t *fiber, void *arg);
151
152static void ev_fiber_return(void);
153static fiber_t *ev_fiber_return_func(fiber_t *fiber, void *arg);
154
158 int type;
159#if !LELY_NO_THREADS
162#endif
166 size_t locked;
168 struct sllist queue;
169};
170
171static fiber_t *ev_fiber_mtx_lock_func(fiber_t *fiber, void *arg);
172
179 struct slnode node;
185 struct ev_task *task;
186};
187
188static fiber_t *ev_fiber_cnd_wait_func(fiber_t *fiber, void *arg);
189
192#if !LELY_NO_THREADS
195#endif
200 struct sllist queue;
201};
202
203static void ev_fiber_cnd_wake(struct slnode *node);
204
205int
207{
208 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
209
210 if (thr->refcnt++)
211 return 1;
212
213 int result = fiber_thrd_init(flags & ~FIBER_GUARD_STACK);
214 if (result == -1) {
215 thr->refcnt--;
216 } else {
217 thr->flags = flags;
218 thr->stack_size = stack_size;
219
220 if (!(thr->max_unused = max_unused))
222 thr->unused = NULL;
223 thr->num_unused = 0;
224
225 thr->curr = NULL;
226 thr->prev = NULL;
227 }
228 return result;
229}
230
231void
233{
234 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
235 assert(!thr->curr);
236
237 if (--thr->refcnt)
238 return;
239
240 struct ev_fiber_ctx *ctx;
241 while ((ctx = thr->unused)) {
242 thr->unused = ctx->next;
243 fiber_destroy(ctx->fiber);
244 }
245
247}
248
249void *
250ev_fiber_exec_alloc(void)
251{
252 struct ev_fiber_exec *exec = malloc(sizeof(*exec));
253 if (!exec) {
254#if !LELY_NO_ERRNO
255 set_errc(errno2c(errno));
256#endif
257 return NULL;
258 }
259 // Suppress a GCC maybe-uninitialized warning.
260 exec->exec_vptr = NULL;
261 // cppcheck-suppress memleak symbolName=exec
262 return &exec->exec_vptr;
263}
264
265void
266ev_fiber_exec_free(void *ptr)
267{
268 if (ptr)
269 free(ev_fiber_exec_from_exec(ptr));
270}
271
272ev_exec_t *
273ev_fiber_exec_init(ev_exec_t *exec_, ev_exec_t *inner_exec)
274{
275 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
276 assert(thr->refcnt);
277 struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
278 assert(inner_exec);
279
280 exec->exec_vptr = &ev_fiber_exec_vtbl;
281
282 exec->inner_exec = inner_exec;
283 exec->task = (struct ev_task)EV_TASK_INIT(
284 exec->inner_exec, &ev_fiber_exec_func);
285
286 exec->thr = thr;
287
288 exec->pending = 0;
289
290#if !LELY_NO_THREADS
291 if (mtx_init(&exec->mtx, mtx_plain) != thrd_success)
292 return NULL;
293#endif
294
295 exec->posted = 0;
296
297 sllist_init(&exec->queue);
298
299 return exec_;
300}
301
302void
303ev_fiber_exec_fini(ev_exec_t *exec_)
304{
305 struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
306 assert(exec->thr == &ev_fiber_thrd);
307
308 ev_fiber_exec_abort(exec_, NULL);
309
310#if !LELY_NO_THREADS
311 mtx_lock(&exec->mtx);
312#endif
313 // Abort ev_fiber_exec_func().
314 if (exec->posted && ev_exec_abort(exec->task.exec, &exec->task))
315 exec->posted = 0;
316 assert(exec->posted == 0);
317#if !LELY_NO_THREADS
318 mtx_unlock(&exec->mtx);
319
320 mtx_destroy(&exec->mtx);
321#endif
322}
323
324ev_exec_t *
326{
327 int errc = 0;
328
329 ev_exec_t *exec = ev_fiber_exec_alloc();
330 if (!exec) {
331 errc = get_errc();
332 goto error_alloc;
333 }
334
335 ev_exec_t *tmp = ev_fiber_exec_init(exec, inner_exec);
336 if (!tmp) {
337 errc = get_errc();
338 goto error_init;
339 }
340 exec = tmp;
341
342 return exec;
343
344error_init:
345 ev_fiber_exec_free((void *)exec);
346error_alloc:
347 set_errc(errc);
348 return NULL;
349}
350
351void
353{
354 if (exec) {
355 ev_fiber_exec_fini(exec);
356 ev_fiber_exec_free((void *)exec);
357 }
358}
359
360ev_exec_t *
362{
363 struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
364
365 return exec->inner_exec;
366}
367
368void
370{
371 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
372 assert(thr->curr);
373
374 thr->prev = fiber_resume_with(thr->prev, ev_fiber_await_func, future);
375}
376
377int
379{
380 assert(mtx);
381
382 if (type & ~ev_fiber_mtx_recursive) {
384 return ev_fiber_error;
385 }
386
387 struct ev_fiber_mtx_impl *impl = malloc(sizeof(*impl));
388 if (!impl)
389 return ev_fiber_nomem;
390
391 impl->type = type;
392
393#if !LELY_NO_THREADS
394 switch (mtx_init(&impl->mtx, mtx_plain)) {
395 case thrd_error: free(impl); return ev_fiber_error;
396 case thrd_nomem: free(impl); return ev_fiber_nomem;
397 default: break;
398 }
399#endif
400
401 impl->ctx = NULL;
402 impl->locked = 0;
403 sllist_init(&impl->queue);
404
405 mtx->_impl = impl;
406
407 return ev_fiber_success;
408}
409
410void
412{
413 assert(mtx);
414 struct ev_fiber_mtx_impl *impl = mtx->_impl;
415 assert(impl);
416
417 assert(!impl->locked);
418 assert(sllist_empty(&impl->queue));
419#if !LELY_NO_THREADS
420 mtx_destroy(&impl->mtx);
421#endif
422 free(impl);
423
424 mtx->_impl = NULL;
425}
426
427int
429{
430 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
431 assert(mtx);
432 struct ev_fiber_mtx_impl *impl = mtx->_impl;
433 assert(impl);
434
435 if (!thr->curr) {
437 return ev_fiber_error;
438 }
439
440#if !LELY_NO_THREADS
441 mtx_lock(&impl->mtx);
442#endif
443 if (impl->locked) {
444 if (impl->ctx == thr->curr) {
445 if (!(impl->type & ev_fiber_mtx_recursive)) {
446#if !LELY_NO_THREADS
447 mtx_unlock(&impl->mtx);
448#endif
450 return ev_fiber_error;
451 }
452 assert(impl->locked < SIZE_MAX);
453 impl->locked++;
454 } else {
455 // The inner mutex will be unlocked in
456 // ev_fiber_mtx_lock_func().
457 thr->prev = fiber_resume_with(thr->prev,
458 ev_fiber_mtx_lock_func, impl);
459 impl->ctx = thr->curr;
460 assert(impl->locked == 1);
461 }
462 } else {
463 assert(impl->ctx == NULL);
464 assert(sllist_empty(&impl->queue));
465 impl->ctx = thr->curr;
466 impl->locked = 1;
467#if !LELY_NO_THREADS
468 mtx_unlock(&impl->mtx);
469#endif
470 }
471
472 return ev_fiber_success;
473}
474
475int
477{
478 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
479 assert(mtx);
480 struct ev_fiber_mtx_impl *impl = mtx->_impl;
481 assert(impl);
482
483 if (!thr->curr) {
485 return ev_fiber_error;
486 }
487
488#if !LELY_NO_THREADS
489 mtx_lock(&impl->mtx);
490#endif
491 if (impl->locked) {
492 if (impl->ctx == thr->curr) {
493 if (!(impl->type & ev_fiber_mtx_recursive)) {
494#if !LELY_NO_THREADS
495 mtx_unlock(&impl->mtx);
496#endif
498 return ev_fiber_error;
499 }
500 assert(impl->locked < SIZE_MAX);
501 impl->locked++;
502#if !LELY_NO_THREADS
503 mtx_unlock(&impl->mtx);
504#endif
505 return ev_fiber_success;
506 } else {
507#if !LELY_NO_THREADS
508 mtx_unlock(&impl->mtx);
509#endif
510 return ev_fiber_busy;
511 }
512 } else {
513 assert(impl->ctx == NULL);
514 assert(sllist_empty(&impl->queue));
515 impl->ctx = thr->curr;
516 impl->locked = 1;
517#if !LELY_NO_THREADS
518 mtx_unlock(&impl->mtx);
519#endif
520 return ev_fiber_success;
521 }
522}
523
524int
526{
527 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
528 assert(mtx);
529 struct ev_fiber_mtx_impl *impl = mtx->_impl;
530 assert(impl);
531
532 if (!thr->curr) {
534 return ev_fiber_error;
535 }
536
537 struct ev_task *task = NULL;
538
539#if !LELY_NO_THREADS
540 mtx_lock(&impl->mtx);
541#endif
542 if (impl->ctx != thr->curr || !impl->locked) {
543#if !LELY_NO_THREADS
544 mtx_unlock(&impl->mtx);
545#endif
547 return ev_fiber_error;
548 }
549 assert((impl->type & ev_fiber_mtx_recursive) || impl->locked == 1);
550 if (!--impl->locked) {
551 impl->ctx = NULL;
552 if ((task = ev_task_from_node(sllist_pop_front(&impl->queue))))
553 impl->locked++;
554 }
555#if !LELY_NO_THREADS
556 mtx_unlock(&impl->mtx);
557#endif
558
559 if (task)
560 ev_exec_post(task->exec, task);
561
562 return ev_fiber_success;
563}
564
565int
567{
568 assert(cond);
569
570 struct ev_fiber_cnd_impl *impl = malloc(sizeof(*impl));
571 if (!impl)
572 return ev_fiber_nomem;
573
574#if !LELY_NO_THREADS
575 switch (mtx_init(&impl->mtx, mtx_plain)) {
576 case thrd_error: free(impl); return ev_fiber_error;
577 case thrd_nomem: free(impl); return ev_fiber_nomem;
578 default: break;
579 }
580#endif
581
582 sllist_init(&impl->queue);
583
584 cond->_impl = impl;
585
586 return ev_fiber_success;
587}
588
589void
591{
592 assert(cond);
593 struct ev_fiber_cnd_impl *impl = cond->_impl;
594 assert(impl);
595
596 assert(sllist_empty(&impl->queue));
597#if !LELY_NO_THREADS
598 mtx_destroy(&impl->mtx);
599#endif
600 free(impl);
601
602 cond->_impl = NULL;
603}
604
605int
607{
608 assert(cond);
609 struct ev_fiber_cnd_impl *impl = cond->_impl;
610 assert(impl);
611
612#if !LELY_NO_THREADS
613 mtx_lock(&impl->mtx);
614#endif
615 struct slnode *node = sllist_pop_front(&impl->queue);
616#if !LELY_NO_THREADS
617 mtx_unlock(&impl->mtx);
618#endif
619
620 if (node)
621 ev_fiber_cnd_wake(node);
622
623 return ev_fiber_success;
624}
625
626int
628{
629 assert(cond);
630 struct ev_fiber_cnd_impl *impl = cond->_impl;
631 assert(impl);
632
633 struct sllist queue;
634 sllist_init(&queue);
635
636#if !LELY_NO_THREADS
637 mtx_lock(&impl->mtx);
638#endif
639 sllist_append(&queue, &impl->queue);
640#if !LELY_NO_THREADS
641 mtx_unlock(&impl->mtx);
642#endif
643
644 struct slnode *node;
645 while ((node = sllist_pop_front(&queue)))
646 ev_fiber_cnd_wake(node);
647
648 return ev_fiber_success;
649}
650
651int
653{
654 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
655 assert(mtx);
656 struct ev_fiber_mtx_impl *impl = mtx->_impl;
657 assert(impl);
658
659 if (!thr->curr) {
661 return ev_fiber_error;
662 }
663
664 struct ev_fiber_cnd_wait wait = { .cond = cond, .mtx = mtx };
665
666#if !LELY_NO_THREADS
667 mtx_lock(&impl->mtx);
668#endif
669 if (impl->ctx != thr->curr || impl->locked != 1) {
670#if !LELY_NO_THREADS
671 mtx_unlock(&impl->mtx);
672#endif
674 return ev_fiber_error;
675 }
676 assert(impl->locked);
677
678 // The inner mutex will be unlocked in ev_fiber_cnd_wait_func().
679 thr->prev = fiber_resume_with(thr->prev, ev_fiber_cnd_wait_func, &wait);
680
681#if !LELY_NO_THREADS
682 mtx_lock(&impl->mtx);
683#endif
684 assert(!impl->ctx);
685 impl->ctx = thr->curr;
686 assert(impl->locked == 1);
687#if !LELY_NO_THREADS
688 mtx_unlock(&impl->mtx);
689#endif
690
691 return ev_fiber_success;
692}
693
694static void
695ev_fiber_exec_on_task_init(ev_exec_t *exec_)
696{
697 struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
698
700}
701
702static void
703ev_fiber_exec_on_task_fini(ev_exec_t *exec_)
704{
705 struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
706
708}
709
710static int
711ev_fiber_exec_dispatch(ev_exec_t *exec_, struct ev_task *task)
712{
713 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
714 struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
715 assert(task);
716 assert(!task->exec || task->exec == exec_);
717
718 // Post the task if the executor is not currently executing a fiber.
719 struct ev_fiber_ctx *ctx = thr->curr;
720 if (!ctx || ctx->exec != exec) {
721 ev_fiber_exec_post(exec_, task);
722 return 0;
723 }
724
725 if (!task->exec)
726 task->exec = exec_;
727
728 // Execute the task immediately.
729 if (task->func)
730 task->func(task);
731
732 return 1;
733}
734
735static void
736ev_fiber_exec_post(ev_exec_t *exec_, struct ev_task *task)
737{
738 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
739 struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
740 assert(task);
741 assert(!task->exec || task->exec == exec_);
742
743 if (!task->exec)
744 task->exec = exec_;
745 ev_fiber_exec_on_task_init(exec_);
746
747 // Append the task to the queue.
748#if !LELY_NO_THREADS
749 mtx_lock(&exec->mtx);
750#endif
751 sllist_push_back(&exec->queue, &task->_node);
752 int post = !exec->posted && exec->thr != thr;
753 if (post)
754 exec->posted = 1;
755#if !LELY_NO_THREADS
756 mtx_unlock(&exec->mtx);
757#endif
758
759 // If no pending fibers are available (and the calling thread is the
760 // thread of this executor), try to post a new fiber immediately.
761 if (exec->thr == thr && !exec->pending)
762 ev_fiber_exec_post_ctx(exec);
763 // Otherwise, post a task to post a fiber.
764 if (post)
765 ev_exec_post(exec->task.exec, &exec->task);
766}
767
768static void
769ev_fiber_exec_defer(ev_exec_t *exec_, struct ev_task *task)
770{
771 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
772 struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
773 assert(task);
774 assert(!task->exec || task->exec == exec_);
775
776 // Post the task if the executor is not currently executing a fiber.
777 struct ev_fiber_ctx *ctx = thr->curr;
778 if (!ctx || ctx->exec != exec) {
779 ev_fiber_exec_post(exec_, task);
780 return;
781 }
782
783 if (!task->exec)
784 task->exec = exec_;
785 ev_fiber_exec_on_task_init(exec_);
786
787 // Push the task to the deferred queue of the fiber.
788 sllist_push_back(&ctx->queue, &task->_node);
789}
790
791static size_t
792ev_fiber_exec_abort(ev_exec_t *exec_, struct ev_task *task)
793{
794 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
795 struct ev_fiber_exec *exec = ev_fiber_exec_from_exec(exec_);
796
797 struct sllist queue;
798 sllist_init(&queue);
799
800 struct ev_fiber_ctx *ctx = thr->curr;
801 if (ctx && ctx->exec == exec) {
802 // Try to find and abort the task(s) in the deferred queue of
803 // the fiber.
804 if (!task)
805 sllist_append(&queue, &ctx->queue);
806 else if (sllist_remove(&ctx->queue, &task->_node))
807 sllist_push_back(&queue, &task->_node);
808 }
809
810#if !LELY_NO_THREADS
811 mtx_lock(&exec->mtx);
812#endif
813 if (!task)
815 else if (sllist_remove(&exec->queue, &task->_node))
816 sllist_push_back(&queue, &task->_node);
817#if !LELY_NO_THREADS
819#endif
820
821 size_t n = 0;
822 while (sllist_pop_front(&queue)) {
823 ev_fiber_exec_on_task_fini(exec_);
824 n += n < SIZE_MAX;
825 }
826 return n;
827}
828
829static void
830ev_fiber_exec_func(struct ev_task *task)
831{
832 assert(task);
833 struct ev_fiber_exec *exec = structof(task, struct ev_fiber_exec, task);
834 assert(exec->thr == &ev_fiber_thrd);
835
836#if !LELY_NO_THREADS
837 mtx_lock(&exec->mtx);
838#endif
839 assert(exec->posted);
840 exec->posted = 0;
841#if !LELY_NO_THREADS
842 mtx_unlock(&exec->mtx);
843#endif
844
845 // If no pending fibers are available, try to post a new fiber.
846 if (!exec->pending)
847 ev_fiber_exec_post_ctx(exec);
848}
849
850static inline struct ev_fiber_exec *
851ev_fiber_exec_from_exec(const ev_exec_t *exec)
852{
853 assert(exec);
854
855 return structof(exec, struct ev_fiber_exec, exec_vptr);
856}
857
858static int
859ev_fiber_exec_post_ctx(struct ev_fiber_exec *exec)
860{
861 int errsv = get_errc();
862 struct ev_fiber_ctx *ctx = ev_fiber_ctx_create(exec);
863 if (!ctx) {
864 // Ignore the error; one of the existing fibers can pick up the
865 // submitted task.
866 set_errc(errsv);
867 return -1;
868 }
869
870 exec->pending++;
871 ev_exec_post(ctx->task.exec, &ctx->task);
872
873 return 0;
874}
875
876static struct ev_fiber_ctx *
877ev_fiber_ctx_create(struct ev_fiber_exec *exec)
878{
879 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
880 assert(thr->refcnt);
881 assert(exec);
882 assert(exec->thr == thr);
883
884 // Use a fiber from the unused list before creating a new one.
885 struct ev_fiber_ctx *ctx = thr->unused;
886 if (ctx) {
887 thr->unused = ctx->next;
888 ctx->next = NULL;
889 assert(thr->num_unused);
890 thr->num_unused--;
891 } else {
892 fiber_t *fiber = fiber_create(&ev_fiber_ctx_fiber_func, NULL,
893 thr->flags, sizeof(struct ev_fiber_ctx),
894 thr->stack_size);
895 if (!fiber)
896 return NULL;
897 ctx = fiber_data(fiber);
898 *ctx = (struct ev_fiber_ctx){
899 .fiber = fiber,
900 };
901 sllist_init(&ctx->queue);
902 }
903
904 // Associate the fiber with the executor.
905 ctx->exec = exec;
906 ctx->task = (struct ev_task)EV_TASK_INIT(
907 exec->inner_exec, &ev_fiber_ctx_task_func);
908
909 return ctx;
910}
911
912static void
913ev_fiber_ctx_destroy(struct ev_fiber_ctx *ctx)
914{
915 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
916 assert(thr->refcnt);
917
918 if (ctx) {
919 assert(sllist_empty(&ctx->queue));
920 if (thr->num_unused < thr->max_unused) {
921 ctx->next = thr->unused;
922 thr->unused = ctx;
923 thr->num_unused++;
924 } else {
925 fiber_destroy(ctx->fiber);
926 }
927 }
928}
929
930static fiber_t *
931ev_fiber_ctx_fiber_func(fiber_t *fiber, void *arg)
932{
933 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
934 (void)arg;
935 struct ev_fiber_ctx *ctx = fiber_data(NULL);
936
937 thr->prev = fiber;
938
939 struct ev_task *task = NULL;
940 for (;;) {
941 if (!task) {
942 // Check if there are any tasks waiting on the queue of
943 // the executor.
944 assert(ctx->exec->pending);
945 ctx->exec->pending--;
946#if !LELY_NO_THREADS
947 mtx_lock(&ctx->exec->mtx);
948#endif
949 task = ev_task_from_node(
950 sllist_pop_front(&ctx->exec->queue));
951#if !LELY_NO_THREADS
952 mtx_unlock(&ctx->exec->mtx);
953#endif
954 // If no tasks are available, return and destroy the
955 // fiber or put it on the unused list.
956 if (!task) {
957 ev_fiber_return();
958 continue;
959 }
960 }
961
962 // Execute the task.
963 ev_exec_t *exec = task->exec;
964 assert(exec == &ctx->exec->exec_vptr);
965 if (task->func)
966 task->func(task);
967 ev_fiber_exec_on_task_fini(exec);
968
969 // Check if there are any deferred tasks remaining.
971 if (!task)
972 ctx->exec->pending++;
973
974 ev_fiber_await(NULL);
975 }
976
977 return NULL;
978}
979
980static void
981ev_fiber_ctx_task_func(struct ev_task *task)
982{
983 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
984 assert(task);
985 struct ev_fiber_ctx *ctx = structof(task, struct ev_fiber_ctx, task);
986
987 thr->curr = ctx;
989 assert(!fiber);
990 (void)fiber;
991}
992
993static struct ev_fiber_ctx *
994ev_fiber_resume(fiber_t *fiber)
995{
996 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
997
998 struct ev_fiber_ctx *ctx = thr->curr;
999 thr->curr = NULL;
1000 ctx->fiber = fiber;
1001
1002 // If there are tasks on the queue, but no pending fibers to execute
1003 // them, try to post a new fiber.
1004 struct ev_fiber_exec *exec = ctx->exec;
1005 if (!exec->pending) {
1006#if !LELY_NO_THREADS
1007 mtx_lock(&exec->mtx);
1008#endif
1009 int empty = sllist_empty(&exec->queue);
1010#if !LELY_NO_THREADS
1011 mtx_unlock(&exec->mtx);
1012#endif
1013 if (!empty)
1014 ev_fiber_exec_post_ctx(exec);
1015 }
1016
1017 return ctx;
1018}
1019
1020static fiber_t *
1021ev_fiber_await_func(fiber_t *fiber, void *arg)
1022{
1023 ev_future_t *future = arg;
1024
1025 struct ev_fiber_ctx *ctx = ev_fiber_resume(fiber);
1026
1027 if (future)
1028 ev_future_submit(future, &ctx->task);
1029 else
1030 ev_exec_post(ctx->task.exec, &ctx->task);
1031
1032 return NULL;
1033}
1034
1035static void
1036ev_fiber_return(void)
1037{
1038 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
1039 assert(thr->curr);
1040
1041 thr->prev = fiber_resume_with(thr->prev, ev_fiber_return_func, NULL);
1042}
1043
1044static fiber_t *
1045ev_fiber_return_func(fiber_t *fiber, void *arg)
1046{
1047 struct ev_fiber_thrd *thr = &ev_fiber_thrd;
1048 (void)arg;
1049
1050 struct ev_fiber_ctx *ctx = thr->curr;
1051 thr->curr = NULL;
1052 ctx->fiber = fiber;
1053
1054 // Destroy the fiber or put it back on the unused list.
1055 ev_fiber_ctx_destroy(ctx);
1056
1057 return NULL;
1058}
1059
1060static fiber_t *
1061ev_fiber_mtx_lock_func(fiber_t *fiber, void *arg)
1062{
1063 struct ev_fiber_mtx_impl *impl = arg;
1064 assert(impl);
1065
1066 struct ev_fiber_ctx *ctx = ev_fiber_resume(fiber);
1067
1068 // Append the fiber to the queue of waiting fibers.
1069 sllist_push_back(&impl->queue, &ctx->task._node);
1070#if !LELY_NO_THREADS
1071 // Unlock the inner mutex locked in ev_fiber_mtx_lock().
1072 mtx_unlock(&impl->mtx);
1073#endif
1074
1075 return NULL;
1076}
1077
1078static fiber_t *
1079ev_fiber_cnd_wait_func(fiber_t *fiber, void *arg)
1080{
1081 struct ev_fiber_cnd_wait *wait = arg;
1082 assert(wait);
1083 ev_fiber_cnd_t *cond = wait->cond;
1084 assert(cond);
1085 struct ev_fiber_cnd_impl *cond_impl = cond->_impl;
1086 assert(cond_impl);
1087 ev_fiber_mtx_t *mtx = wait->mtx;
1088 struct ev_fiber_mtx_impl *mtx_impl = mtx->_impl;
1089 assert(mtx_impl);
1090
1091 struct ev_fiber_ctx *ctx = ev_fiber_resume(fiber);
1092
1093 wait->task = &ctx->task;
1094#if !LELY_NO_THREADS
1095 mtx_lock(&cond_impl->mtx);
1096#endif
1097 sllist_push_back(&cond_impl->queue, &wait->node);
1098#if !LELY_NO_THREADS
1099 mtx_unlock(&cond_impl->mtx);
1100#endif
1101
1102 // Unlock the mutex for the duration of the wait.
1103 mtx_impl->ctx = NULL;
1104 assert(mtx_impl->locked == 1);
1105 mtx_impl->locked = 0;
1106 struct ev_task *task =
1108 if (task)
1109 mtx_impl->locked++;
1110#if !LELY_NO_THREADS
1111 mtx_unlock(&mtx_impl->mtx);
1112#endif
1113 // cppcheck-suppress duplicateCondition
1114 if (task)
1115 ev_exec_post(task->exec, task);
1116
1117 return NULL;
1118}
1119
1120static void
1121ev_fiber_cnd_wake(struct slnode *node)
1122{
1123 assert(node);
1124 struct ev_fiber_cnd_wait *wait =
1126 ev_fiber_mtx_t *mtx = wait->mtx;
1127 assert(mtx);
1128 struct ev_fiber_mtx_impl *impl = mtx->_impl;
1129 assert(impl);
1130 struct ev_task *task = wait->task;
1131 assert(task);
1132
1133#if !LELY_NO_THREADS
1134 mtx_lock(&impl->mtx);
1135#endif
1136 if (impl->locked) {
1137 sllist_push_back(&impl->queue, &task->_node);
1138 task = NULL;
1139 } else {
1140 assert(sllist_empty(&impl->queue));
1141 impl->locked = 1;
1142 }
1143#if !LELY_NO_THREADS
1144 mtx_unlock(&impl->mtx);
1145#endif
1146
1147 if (task)
1148 ev_exec_post(task->exec, task);
1149}
1150
1151#endif // !LELY_NO_MALLOC
This header file is part of the utilities library; it contains the native and platform-independent er...
@ ERRNUM_PERM
Operation not permitted.
Definition: errnum.h:208
@ ERRNUM_INVAL
Invalid argument.
Definition: errnum.h:132
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition: errnum.c:932
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:944
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:46
void set_errnum(errnum_t errnum)
Sets the current (thread-specific) platform-independent error number to errnum.
Definition: errnum.h:424
This header file is part of the event library; it contains the abstract task executor interface.
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:136
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:124
void ev_exec_on_task_fini(ev_exec_t *exec)
Undoes the effect of a previous call to ev_exec_on_task_init().
Definition: exec.h:112
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:106
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition: features.h:249
fiber_t * fiber_resume_with(fiber_t *fiber, fiber_func_t *func, void *arg)
Suspends the calling fiber and resumes the specified fiber, optionally executing a function before re...
Definition: fiber.c:430
#define FIBER_GUARD_STACK
A flag specifying a fiber to add a guard page when allocating the stack frame so that the kernel gene...
Definition: fiber.h:85
void * fiber_data(const fiber_t *fiber)
Returns a pointer to the data region of the specified fiber, or of the calling fiber if fiber is NULL...
Definition: fiber.c:416
int fiber_thrd_init(int flags)
Initializes the fiber associated with the calling thread.
Definition: fiber.c:210
fiber_t * fiber_create(fiber_func_t *func, void *arg, int flags, size_t data_size, size_t stack_size)
Creates a new fiber, allocates a stack and sets up a calling environment to begin executing the speci...
Definition: fiber.c:260
void fiber_thrd_fini(void)
Finalizes the fiber associated with the calling thread.
Definition: fiber.c:242
fiber_t * fiber_resume(fiber_t *fiber)
Equivalent to fiber_resume_with(fiber, NULL, NULL).
Definition: fiber.c:424
void fiber_destroy(fiber_t *fiber)
Destroys the specified fiber.
Definition: fiber.c:395
int ev_fiber_mtx_lock(ev_fiber_mtx_t *mtx)
Suspends the currently running fiber until it locks the fiber mutex at mtx.
Definition: fiber_exec.c:428
void ev_fiber_thrd_fini(void)
Finalizes the calling thread and prevents further use by fiber executors.
Definition: fiber_exec.c:232
int ev_fiber_mtx_init(ev_fiber_mtx_t *mtx, int type)
Creates a fiber mutex object with properties indicated by type, which must have one of the four value...
Definition: fiber_exec.c:378
int ev_fiber_thrd_init(int flags, size_t stack_size, size_t max_unused)
Initializes the calling thread for use by fiber executors.
Definition: fiber_exec.c:206
int ev_fiber_mtx_unlock(ev_fiber_mtx_t *mtx)
Unlocks the fiber mutex at mtx.
Definition: fiber_exec.c:525
void ev_fiber_cnd_destroy(ev_fiber_cnd_t *cond)
Releases all resources used by the fiber condition variable at cond.
Definition: fiber_exec.c:590
void ev_fiber_mtx_destroy(ev_fiber_mtx_t *mtx)
Releases any resources used by the fiber mutex at mtx.
Definition: fiber_exec.c:411
int ev_fiber_cnd_signal(ev_fiber_cnd_t *cond)
Unblocks one of the fibers that are blocked on the fiber condition variable at cond at the time of th...
Definition: fiber_exec.c:606
int ev_fiber_cnd_broadcast(ev_fiber_cnd_t *cond)
Unblocks all of the fibers that are blocked on the fiber condition variable at cond at the time of th...
Definition: fiber_exec.c:627
int ev_fiber_cnd_init(ev_fiber_cnd_t *cond)
Creates a fiber condition variable.
Definition: fiber_exec.c:566
void ev_fiber_exec_destroy(ev_exec_t *exec)
Destroys a fiber executor.
Definition: fiber_exec.c:352
void ev_fiber_await(ev_future_t *future)
Suspends a currently running fiber until the specified future becomes ready (or is cancelled).
Definition: fiber_exec.c:369
#define LELY_EV_FIBER_MAX_UNUSED
The maximum number of unused fibers per thread.
Definition: fiber_exec.c:43
int ev_fiber_cnd_wait(ev_fiber_cnd_t *cond, ev_fiber_mtx_t *mtx)
Atomically unlocks the fiber mutex at mtx and endeavors to block until the fiber condition variable a...
Definition: fiber_exec.c:652
int ev_fiber_mtx_trylock(ev_fiber_mtx_t *mtx)
Endeavors to lock the fiber mutex at mtx.
Definition: fiber_exec.c:476
ev_exec_t * ev_fiber_exec_get_inner_exec(const ev_exec_t *exec_)
Returns a pointer to the inner executor of a fiber executor.
Definition: fiber_exec.c:361
ev_exec_t * ev_fiber_exec_create(ev_exec_t *inner_exec)
Creates a fiber executor.
Definition: fiber_exec.c:325
This header file is part of the event library; it contains the fiber executor, mutex and condition va...
@ ev_fiber_nomem
Indicates that the requested operation failed because it was unable to allocate memory.
Definition: fiber_exec.h:62
@ ev_fiber_busy
Indicates that the requested operation failed because a resource requested by a test and return funct...
Definition: fiber_exec.h:57
@ ev_fiber_error
Indicates that the requested operation failed.
Definition: fiber_exec.h:47
@ ev_fiber_success
Indicates that the requested operation succeeded.
Definition: fiber_exec.h:45
@ ev_fiber_mtx_recursive
A fiber mutex type that supports recursive locking.
Definition: fiber_exec.h:74
void ev_future_submit(ev_future_t *future, struct ev_task *task)
Submits a task to be executed once the specified future is ready.
Definition: future.c:364
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:194
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:257
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:232
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:202
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:243
This is the internal header file of the event library.
This header file is part of the C11 and POSIX compatibility library; it includes <stdint....
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
The implementation of a fiber condition variable.
Definition: fiber_exec.c:191
struct sllist queue
The queue of fibers waiting for the condition variable to be signaled.
Definition: fiber_exec.c:200
mtx_t mtx
The mutex protecting queue.
Definition: fiber_exec.c:194
A synchronization primitive (similar to the standard C11 condition variable) that can be used to bloc...
Definition: fiber_exec.h:92
The context of a wait operation on a fiber condition variable.
Definition: fiber_exec.c:177
struct slnode node
The node in the queue of fibers waiting on cond.
Definition: fiber_exec.c:179
ev_fiber_cnd_t * cond
A pointer to the condition variable passed to ev_fiber_cnd_wait().
Definition: fiber_exec.c:181
ev_fiber_mtx_t * mtx
A pointer to the mutex passed to ev_fiber_cnd_wait().
Definition: fiber_exec.c:183
struct ev_task * task
A pointer to the task (in ev_fiber_ctx) used to wake up the fiber.
Definition: fiber_exec.c:185
The context of a fiber used for executing tasks.
Definition: fiber_exec.c:130
struct sllist queue
The queue of deferred tasks.
Definition: fiber_exec.c:140
fiber_t * fiber
A pointer to the fiber containing this context.
Definition: fiber_exec.c:132
struct ev_fiber_ctx * next
A pointer to the next fiber in the list of unused fibers.
Definition: fiber_exec.c:134
struct ev_task task
The task used to resume the fiber.
Definition: fiber_exec.c:138
struct ev_fiber_exec * exec
The executor using this fiber.
Definition: fiber_exec.c:136
The implementation of a fiber executor.
Definition: fiber_exec.c:98
struct ev_fiber_thrd * thr
A pointer to the ev_fiber_thrd instance for this executor.
Definition: fiber_exec.c:106
ev_exec_t * inner_exec
A pointer to the inner executor.
Definition: fiber_exec.c:102
struct sllist queue
The queue of tasks submitted to this executor.
Definition: fiber_exec.c:116
int posted
A flag indicating whether task has been posted to inner_exec.
Definition: fiber_exec.c:114
struct ev_task task
The task used to create new fibers.
Definition: fiber_exec.c:104
const struct ev_exec_vtbl * exec_vptr
A pointer to the virtual table for the executor interface.
Definition: fiber_exec.c:100
mtx_t mtx
The mutex protecting posted and queue.
Definition: fiber_exec.c:111
size_t pending
The number of pending fibers available to execute a task.
Definition: fiber_exec.c:108
The implementation of a fiber mutex.
Definition: fiber_exec.c:156
int type
The type of mutex: ev_fiber_mtx_plain or ev_fiber_mtx_recursive.
Definition: fiber_exec.c:158
struct sllist queue
The queue of fibers waiting to acquire the lock.
Definition: fiber_exec.c:168
struct ev_fiber_ctx * ctx
A pointer to the fiber holding the lock.
Definition: fiber_exec.c:164
size_t locked
The number of times the mutex has been recursively locked.
Definition: fiber_exec.c:166
mtx_t mtx
The mutex protecting locked and queue.
Definition: fiber_exec.c:161
A synchronization primitive (similar to the standard C11 mutex) that can be used to protect shared da...
Definition: fiber_exec.h:82
The parameters used for creating fibers on this thread and the list of unused fibers.
Definition: fiber_exec.c:55
size_t max_unused
The maximum number of unused fibers for this thread.
Definition: fiber_exec.c:67
size_t num_unused
The number of unused fibers.
Definition: fiber_exec.c:71
size_t stack_size
The size (in bytes) of the stack frame allocated for each fiber.
Definition: fiber_exec.c:65
struct ev_fiber_ctx * curr
A pointer to the currently running fiber.
Definition: fiber_exec.c:73
struct ev_fiber_ctx * unused
The list of unused fibers.
Definition: fiber_exec.c:69
fiber_t * prev
A pointer to the previously running (suspended) fiber.
Definition: fiber_exec.c:75
size_t refcnt
The number of invocations of ev_fiber_thrd_init() minus the the number of invocation of ev_fiber_thrd...
Definition: fiber_exec.c:61
int flags
The flags used when creating each fiber.
Definition: fiber_exec.c:63
A future.
Definition: future.c:66
An executable task.
Definition: task.h:41
ev_task_func_t * func
The function to be invoked when the task is run.
Definition: task.h:45
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
A fiber.
Definition: fiber.c:130
A singly-linked list.
Definition: sllist.h:52
A node in a singly-linked list.
Definition: sllist.h:40
This header file is part of the event library; it contains the task declarations.
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121
@ thrd_nomem
Indicates that the requested operation failed because it was unable to allocate memory.
Definition: threads.h:138
@ thrd_error
Indicates that the requested operation failed.
Definition: threads.h:123
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109