Lely core libraries 2.3.4
loop.c
Go to the documentation of this file.
1
24#include "ev.h"
25
26#if !LELY_NO_MALLOC
27
28#if !LELY_NO_THREADS
29#include <lely/libc/stdatomic.h>
30#include <lely/libc/threads.h>
31#endif
32#include <lely/ev/exec.h>
33#define LELY_EV_LOOP_INLINE extern inline
34#include <lely/ev/loop.h>
35#include <lely/ev/std_exec.h>
36#include <lely/ev/task.h>
37#include <lely/util/dllist.h>
38#include <lely/util/errnum.h>
39#include <lely/util/time.h>
40#include <lely/util/util.h>
41
42#include <assert.h>
43#include <stdint.h>
44#include <stdlib.h>
45
46#ifndef LELY_EV_LOOP_CTX_MAX_UNUSED
48#define LELY_EV_LOOP_CTX_MAX_UNUSED 16
49#endif
50
57 size_t refcnt;
63 struct ev_task task;
66#if !LELY_NO_THREADS
74 unsigned waiting : 1;
75#endif
77 unsigned ready : 1;
79 unsigned polling : 1;
81 void *thr;
83 struct dlnode node;
89};
90
91static void ev_loop_ctx_task_func(struct ev_task *task);
92
93static struct ev_loop_ctx *ev_loop_ctx_alloc(void);
94static void ev_loop_ctx_free(struct ev_loop_ctx *ctx);
95
96static void ev_loop_ctx_release(struct ev_loop_ctx *ctx);
97
98static struct ev_loop_ctx *ev_loop_ctx_create(
100static void ev_loop_ctx_destroy(struct ev_loop_ctx *ctx);
101
102static size_t ev_loop_ctx_wait_one(struct ev_loop_ctx **pctx, ev_loop_t *loop,
104static size_t ev_loop_ctx_wait_one_until(struct ev_loop_ctx **pctx,
106 const struct timespec *abs_time);
107
108static int ev_loop_ctx_kill(struct ev_loop_ctx *ctx, int stop);
109
116};
117
118#if LELY_NO_THREADS
119static struct ev_loop_thrd ev_loop_thrd = { 0, NULL };
120#else
121static _Thread_local struct ev_loop_thrd ev_loop_thrd = { 0, NULL };
122#endif
123
124static void ev_loop_std_exec_impl_on_task_init(ev_std_exec_impl_t *impl);
125static void ev_loop_std_exec_impl_on_task_fini(ev_std_exec_impl_t *impl);
126static void ev_loop_std_exec_impl_post(
127 ev_std_exec_impl_t *impl, struct ev_task *task);
128static size_t ev_loop_std_exec_impl_abort(
129 ev_std_exec_impl_t *impl, struct ev_task *task);
130
131// clang-format off
132static const struct ev_std_exec_impl_vtbl ev_loop_std_exec_impl_vtbl = {
133 &ev_loop_std_exec_impl_on_task_init,
134 &ev_loop_std_exec_impl_on_task_fini,
135 &ev_loop_std_exec_impl_post,
136 &ev_loop_std_exec_impl_abort
137};
138// clang-format on
139
141struct ev_loop {
148 size_t npoll;
156#if !LELY_NO_THREADS
159#endif
161 struct sllist queue;
163 struct ev_task task;
170#if LELY_NO_THREADS || (LELY_NO_ATOMICS && (!_WIN32 || defined(__MINGW32__)))
171 size_t ntasks;
172#elif _WIN64 && !defined(__MINGW32__)
173 volatile LONGLONG ntasks;
174#elif _WIN32 && !defined(__MINGW32__)
175 volatile LONG ntasks;
176#else
177 atomic_size_t ntasks;
178#endif
181#if !LELY_NO_THREADS
184#endif
188 size_t npolling;
195 size_t nunused;
196};
197
198static inline ev_loop_t *ev_loop_from_impl(const ev_std_exec_impl_t *impl);
199
200static int ev_loop_empty(const ev_loop_t *loop);
201static size_t ev_loop_ntasks(const ev_loop_t *loop);
202
203static void ev_loop_do_stop(ev_loop_t *loop);
204
205static int ev_loop_kill_any(ev_loop_t *loop, int polling);
206
207void *
208ev_loop_alloc(void)
209{
210 void *ptr = malloc(sizeof(ev_loop_t));
211#if !LELY_NO_ERRNO
212 if (!ptr)
213 set_errc(errno2c(errno));
214#endif
215 return ptr;
216}
217
218void
219ev_loop_free(void *ptr)
220{
221 free(ptr);
222}
223
224ev_loop_t *
225ev_loop_init(ev_loop_t *loop, ev_poll_t *poll, size_t npoll, int poll_task)
226{
227 assert(loop);
228
229 loop->poll = poll;
230 loop->npoll = npoll;
231
232 loop->impl_vptr = &ev_loop_std_exec_impl_vtbl;
233 ev_std_exec_init(ev_loop_get_exec(loop), &loop->impl_vptr);
234
235#if !LELY_NO_THREADS
237 return NULL;
238#endif
239
241
242 loop->task = (struct ev_task)EV_TASK_INIT(NULL, NULL);
243 if (loop->poll && poll_task)
244 sllist_push_back(&loop->queue, &loop->task._node);
245
246#if LELY_NO_THREADS || LELY_NO_ATOMICS || (_WIN32 && !defined(__MINGW32__))
247 loop->ntasks = 0;
248#else
249 atomic_init(&loop->ntasks, 0);
250#endif
251
252 loop->stopped = 0;
253
254#if !LELY_NO_THREADS
255 dllist_init(&loop->waiting);
256#endif
257 dllist_init(&loop->polling);
258 loop->npolling = 0;
259
260 loop->unused = NULL;
261 loop->nunused = 0;
262
263 return loop;
264}
265
266void
267ev_loop_fini(ev_loop_t *loop)
268{
269 assert(loop);
270
271 while (loop->unused) {
272 struct ev_loop_ctx *ctx = loop->unused;
273 loop->unused = ctx->next;
274 ev_loop_ctx_free(ctx);
275 }
276
277 assert(!loop->npolling);
278 assert(dllist_empty(&loop->polling));
279#if !LELY_NO_THREADS
280 assert(dllist_empty(&loop->waiting));
281#endif
282
283#if !LELY_NO_THREADS
285#endif
286
287 ev_std_exec_fini(ev_loop_get_exec(loop));
288}
289
290ev_loop_t *
291ev_loop_create(ev_poll_t *poll, size_t npoll, int poll_task)
292{
293 int errc = 0;
294
295 ev_loop_t *loop = ev_loop_alloc();
296 if (!loop) {
297 errc = get_errc();
298 goto error_alloc;
299 }
300
301 ev_loop_t *tmp = ev_loop_init(loop, poll, npoll, poll_task);
302 if (!tmp) {
303 errc = get_errc();
304 goto error_init;
305 }
306 loop = tmp;
307
308 return loop;
309
310error_init:
311 ev_loop_free(loop);
312error_alloc:
313 set_errc(errc);
314 return NULL;
315}
316
317void
319{
320 if (loop) {
321 ev_loop_fini(loop);
322 ev_loop_free(loop);
323 }
324}
325
326ev_poll_t *
328{
329 assert(loop);
330
331 return loop->poll;
332}
333
334ev_exec_t *
336{
337 assert(loop);
338
339 return &loop->exec.exec_vptr;
340}
341
342void
344{
345 assert(loop);
346
347#if !LELY_NO_THREADS
348 mtx_lock(&loop->mtx);
349#endif
350 ev_loop_do_stop(loop);
351#if !LELY_NO_THREADS
353#endif
354}
355
356int
358{
359 assert(loop);
360
361#if !LELY_NO_THREADS
362 mtx_lock((mtx_t *)&loop->mtx);
363#endif
364 int stopped = loop->stopped;
365#if !LELY_NO_THREADS
366 mtx_unlock((mtx_t *)&loop->mtx);
367#endif
368 return stopped;
369}
370
371void
373{
374 assert(loop);
375
376#if !LELY_NO_THREADS
377 mtx_lock(&loop->mtx);
378#endif
379 loop->stopped = 0;
380#if !LELY_NO_THREADS
382#endif
383}
384
385size_t
387{
388 size_t n = 0;
389 struct ev_loop_ctx *ctx = NULL;
390 while (ev_loop_ctx_wait_one(&ctx, loop, future))
391 n += n < SIZE_MAX;
392 ev_loop_ctx_destroy(ctx);
393 return n;
394}
395
396size_t
398 const struct timespec *abs_time)
399{
400 size_t n = 0;
401 struct ev_loop_ctx *ctx = NULL;
402 while (ev_loop_ctx_wait_one_until(&ctx, loop, future, abs_time))
403 n += n < SIZE_MAX;
404 ev_loop_ctx_destroy(ctx);
405 return n;
406}
407
408size_t
410{
411 struct ev_loop_ctx *ctx = NULL;
412 size_t n = ev_loop_ctx_wait_one(&ctx, loop, future);
413 ev_loop_ctx_destroy(ctx);
414 return n;
415}
416
417size_t
419 const struct timespec *abs_time)
420{
421 struct ev_loop_ctx *ctx = NULL;
422 size_t n = ev_loop_ctx_wait_one_until(&ctx, loop, future, abs_time);
423 ev_loop_ctx_destroy(ctx);
424 return n;
425}
426
427void *
429{
430 return &ev_loop_thrd;
431}
432
433int
435{
436#if LELY_NO_THREADS
437 (void)loop;
438#else
439 assert(loop);
440#endif
441 struct ev_loop_thrd *thr = thr_;
442 assert(thr);
443
444 int result = 0;
445 int errc = get_errc();
446#if !LELY_NO_THREADS
447 mtx_lock(&loop->mtx);
448#endif
449 if (!thr->stopped) {
450 if (thr->ctx) {
451 if ((result = ev_loop_ctx_kill(thr->ctx, 1)) == -1)
452 errc = get_errc();
453 } else {
454 thr->stopped = 1;
455 }
456 }
457#if !LELY_NO_THREADS
458 mtx_unlock(&loop->mtx);
459#endif
460 set_errc(errc);
461 return result;
462}
463
464static void
465ev_loop_ctx_task_func(struct ev_task *task)
466{
467 assert(task);
468 struct ev_loop_ctx *ctx = structof(task, struct ev_loop_ctx, task);
469 ev_loop_t *loop = ctx->loop;
470 assert(loop);
471 assert(ctx->future);
472
473#if LELY_NO_THREADS
474 (void)loop;
475#else
476 mtx_lock(&loop->mtx);
477#endif
478 if (ctx->refcnt > 1 && !*ctx->pstopped && !ctx->ready) {
479 ctx->ready = 1;
480 ev_loop_ctx_kill(ctx, 0);
481 }
482#if !LELY_NO_THREADS
484#endif
485 ev_loop_ctx_release(ctx);
486}
487
488static struct ev_loop_ctx *
489ev_loop_ctx_alloc(void)
490{
491 int errc = 0;
492
493 struct ev_loop_ctx *ctx = malloc(sizeof(*ctx));
494 if (!ctx) {
495#if !LELY_NO_ERRNO
496 errc = errno2c(errno);
497#endif
498 goto error_malloc_ctx;
499 }
500
501 ctx->refcnt = 0;
502
503 ctx->loop = NULL;
504
505 ctx->future = NULL;
506 ctx->task = (struct ev_task)EV_TASK_INIT(NULL, &ev_loop_ctx_task_func);
507
508 ctx->pstopped = NULL;
509#if !LELY_NO_THREADS
510 if (cnd_init(&ctx->cond) != thrd_success) {
511 errc = get_errc();
512 goto error_init_cond;
513 }
514 ctx->waiting = 0;
515#endif
516 ctx->ready = 0;
517 ctx->polling = 0;
518 ctx->thr = NULL;
519
520 dlnode_init(&ctx->node);
521 ctx->next = NULL;
522
523 return ctx;
524
525#if !LELY_NO_THREADS
526 // cnd_destroy(&ctx->cond);
527error_init_cond:
528#endif
529 free(ctx);
530error_malloc_ctx:
531 set_errc(errc);
532 return NULL;
533}
534
535static void
536ev_loop_ctx_free(struct ev_loop_ctx *ctx)
537{
538 if (ctx) {
539#if !LELY_NO_THREADS
540 cnd_destroy(&ctx->cond);
541#endif
542 free(ctx);
543 }
544}
545
546static void
547ev_loop_ctx_release(struct ev_loop_ctx *ctx)
548{
549 assert(ctx);
550 ev_loop_t *loop = ctx->loop;
551 assert(loop);
552
553#if !LELY_NO_THREADS
554 mtx_lock(&loop->mtx);
555#endif
556 if (--ctx->refcnt) {
557#if !LELY_NO_THREADS
558 mtx_unlock(&loop->mtx);
559#endif
560 return;
561 }
562 ev_future_t *future = ctx->future;
563 ctx->future = NULL;
564#if !LELY_NO_THREADS
565 assert(!ctx->waiting);
566#endif
567 assert(!ctx->polling);
569 ctx->next = loop->unused;
570 loop->unused = ctx;
571 loop->nunused++;
572#if !LELY_NO_THREADS
573 mtx_unlock(&loop->mtx);
574#endif
575 ev_future_release(future);
576 } else {
577#if !LELY_NO_THREADS
578 mtx_unlock(&loop->mtx);
579#endif
580 ev_future_release(future);
581 ev_loop_ctx_free(ctx);
582 }
583}
584
585static struct ev_loop_ctx *
586ev_loop_ctx_create(ev_loop_t *loop, ev_future_t *future)
587{
588 assert(loop);
589
590#if !LELY_NO_THREADS
591 mtx_lock(&loop->mtx);
592#endif
593 struct ev_loop_ctx *ctx = loop->unused;
594 if (ctx) {
595 loop->unused = ctx->next;
596 assert(loop->nunused);
597 loop->nunused--;
598#if !LELY_NO_THREADS
600#endif
601 } else {
602#if !LELY_NO_THREADS
604#endif
605 ctx = ev_loop_ctx_alloc();
606 if (!ctx)
607 return NULL;
608 }
609
610 assert(!ctx->refcnt);
611 ctx->refcnt++;
612
613 ctx->loop = loop;
614
616 ctx->task = (struct ev_task)EV_TASK_INIT(
617 ev_loop_get_exec(loop), &ev_loop_ctx_task_func);
618
620
621#if !LELY_NO_THREADS
622 assert(!ctx->waiting);
623#endif
624 ctx->ready = 0;
625 assert(!ctx->polling);
626 ctx->thr = loop->poll ? ev_poll_self(loop->poll) : NULL;
627
628 dlnode_init(&ctx->node);
629
630 ctx->next = ev_loop_thrd.ctx;
631 ev_loop_thrd.ctx = ctx;
632
633 if (ctx->future) {
634 ctx->refcnt++;
635 ev_future_submit(ctx->future, &ctx->task);
636 }
637
638 return ctx;
639}
640
641static void
642ev_loop_ctx_destroy(struct ev_loop_ctx *ctx)
643{
644 if (ctx) {
645 if (ctx->future)
646 ev_future_cancel(ctx->future, &ctx->task);
647
648 assert(ev_loop_thrd.ctx == ctx);
649 ev_loop_thrd.ctx = ctx->next;
650
651 ev_loop_ctx_release(ctx);
652 }
653}
654
655static inline int
656ev_loop_can_poll(ev_loop_t *loop)
657{
658 return loop->poll && (!loop->npoll || loop->npolling < loop->npoll);
659}
660
661static size_t
662ev_loop_ctx_wait_one(
663 struct ev_loop_ctx **pctx, ev_loop_t *loop, ev_future_t *future)
664{
665 assert(pctx);
666 struct ev_loop_ctx *ctx = *pctx;
667 assert(loop);
668 assert(!ctx || ctx->loop == loop);
669
670 size_t n = 0;
671#if !LELY_NO_THREADS
672 mtx_lock(&loop->mtx);
673#endif
674 int poll_task = 0;
675 while (!loop->stopped && (!ctx || (!*ctx->pstopped && !ctx->ready))) {
676 // Stop the event loop if no more tasks remain or have been
677 // announced with ev_exec_on_task_init(), unless we should be
678 // waiting on a future but have not created the event loop
679 // context yet.
680 if (ev_loop_empty(loop) && !ev_loop_ntasks(loop)
681 && (!future || ctx)) {
682 ev_loop_do_stop(loop);
683 continue;
684 }
685 struct ev_task *task = ev_task_from_node(
686 sllist_pop_front(&loop->queue));
687 if (task && task == &loop->task) {
688 // The polling task is not a real task, but is part of
689 // the task queue for scheduling purposes.
690 poll_task = 1;
691 task = NULL;
692 }
693 if (task) {
694 // If a real task is available, execute it and return.
695#if !LELY_NO_THREADS
696 mtx_unlock(&loop->mtx);
697#endif
698 assert(task->exec);
699 ev_exec_run(task->exec, task);
700 n++;
701#if !LELY_NO_THREADS
702 mtx_lock(&loop->mtx);
703#endif
704 break;
705 }
706 if (!ctx) {
707 // Only create an event loop context when we have to
708 // poll or wait.
709#if !LELY_NO_THREADS
710 mtx_unlock(&loop->mtx);
711#endif
712 ctx = *pctx = ev_loop_ctx_create(loop, future);
713 if (!ctx) {
714#if !LELY_NO_THREADS
715 mtx_lock(&loop->mtx);
716#endif
717 break;
718 }
719#if !LELY_NO_THREADS
720 mtx_lock(&loop->mtx);
721#endif
722 // We released the lock, so a task may have been queued.
723 continue;
724 }
725 if (ev_loop_can_poll(loop)) {
726 ctx->polling = 1;
727 // Wake polling threads in LIFO order.
728 dllist_push_front(&loop->polling, &ctx->node);
729 loop->npolling++;
730 int empty = sllist_empty(&loop->queue);
731#if !LELY_NO_THREADS
732 mtx_unlock(&loop->mtx);
733#endif
734 int result = ev_poll_wait(loop->poll, empty ? -1 : 0);
735#if !LELY_NO_THREADS
736 mtx_lock(&loop->mtx);
737#endif
738 loop->npolling--;
739 dllist_remove(&loop->polling, &ctx->node);
740 ctx->polling = 0;
741 if (result == -1)
742 break;
743 } else if (!sllist_empty(&loop->queue)) {
744 continue;
745 } else {
746#if LELY_NO_THREADS
747 break;
748#else // !LELY_NO_THREADS
749 ctx->waiting = 1;
750 // Wake waiting threads in LIFO order.
751 dllist_push_front(&loop->waiting, &ctx->node);
752 int result = cnd_wait(&ctx->cond, &loop->mtx);
753 dllist_remove(&loop->waiting, &ctx->node);
754 ctx->waiting = 0;
755 if (result != thrd_success)
756 break;
757#endif // !LELY_NO_THREADS
758 }
759 }
760 int empty = sllist_empty(&loop->queue);
761 if (poll_task)
762 // Requeue the polling task.
763 sllist_push_back(&loop->queue, &loop->task._node);
764 if (!empty)
765 // If any real tasks remain on the queue, wake up any polling or
766 // non-polling thread.
767 ev_loop_kill_any(loop, 1);
768 else if (poll_task)
769 // Wake up any non-polling thread so it can start polling.
770 ev_loop_kill_any(loop, 0);
771 if (!n && ctx && *ctx->pstopped)
772 // Reset the thread-local flag used to stop an event loop with
773 // ev_loop_kill(), so it will resume on the next run funciton.
774 *ctx->pstopped = 0;
775#if !LELY_NO_THREADS
776 mtx_unlock(&loop->mtx);
777#endif
778 return n;
779}
780
781static size_t
782ev_loop_ctx_wait_one_until(struct ev_loop_ctx **pctx, ev_loop_t *loop,
783 ev_future_t *future, const struct timespec *abs_time)
784{
785 assert(pctx);
786 struct ev_loop_ctx *ctx = *pctx;
787 assert(loop);
788 assert(!ctx || ctx->loop == loop);
789#if LELY_NO_THREADS && LELY_NO_TIMEOUT
790 (void)abs_time;
791#endif
792
793 size_t n = 0;
794#if !LELY_NO_THREADS
795 mtx_lock(&loop->mtx);
796#endif
797 int poll_task = 0;
798 while (!loop->stopped && (!ctx || (!*ctx->pstopped && !ctx->ready))) {
799 // Stop the event loop if no more tasks remain or have been
800 // announced with ev_exec_on_task_init(), unless we should be
801 // waiting on a future but have not created the event loop
802 // context yet.
803 if (ev_loop_empty(loop) && !ev_loop_ntasks(loop)
804 && (!future || ctx)) {
805 ev_loop_do_stop(loop);
806 continue;
807 }
808 struct ev_task *task = ev_task_from_node(
809 sllist_pop_front(&loop->queue));
810 if (task && task == &loop->task) {
811 // The polling task is not a real task, but is part of
812 // the task queue for scheduling purposes.
813 poll_task = 1;
814 task = NULL;
815 }
816 if (task) {
817 // If a real task is available, execute it and return.
818#if !LELY_NO_THREADS
819 mtx_unlock(&loop->mtx);
820#endif
821 assert(task->exec);
822 ev_exec_run(task->exec, task);
823 n++;
824#if !LELY_NO_THREADS
825 mtx_lock(&loop->mtx);
826#endif
827 break;
828 }
829 if (!ctx) {
830 // Only create an event loop context when we have to
831 // poll or wait.
832#if !LELY_NO_THREADS
833 mtx_unlock(&loop->mtx);
834#endif
835 ctx = *pctx = ev_loop_ctx_create(loop, future);
836 if (!ctx) {
837#if !LELY_NO_THREADS
838 mtx_lock(&loop->mtx);
839#endif
840 break;
841 }
842#if !LELY_NO_THREADS
843 mtx_lock(&loop->mtx);
844#endif
845 // We released the lock, so a task may have been queued.
846 continue;
847 }
848 if (ev_loop_can_poll(loop)) {
849 ctx->polling = 1;
850 // Wake polling threads in LIFO order.
851 dllist_push_front(&loop->polling, &ctx->node);
852 loop->npolling++;
853#if !LELY_NO_TIMEOUT
854 int empty = sllist_empty(&loop->queue);
855#endif
856#if !LELY_NO_THREADS
857 mtx_unlock(&loop->mtx);
858#endif
859 int result = -1;
860 int64_t msec = 0;
861#if !LELY_NO_TIMEOUT
862 if (empty && abs_time) {
863 struct timespec now = { 0, 0 };
864 if (!timespec_get(&now, TIME_UTC))
865 goto error;
866 if (timespec_cmp(abs_time, &now) <= 0) {
868 goto error;
869 }
870 msec = timespec_diff_msec(abs_time, &now);
871 if (!msec)
872 // Prevent a busy loop due to rounding.
873 msec = 1;
874 else if (msec > INT_MAX)
875 msec = INT_MAX;
876 }
877#endif // !LELY_NO_TIMEOUT
878 result = ev_poll_wait(loop->poll, msec);
879#if !LELY_NO_TIMEOUT
880 error:
881#endif
882#if !LELY_NO_THREADS
883 mtx_lock(&loop->mtx);
884#endif
885 loop->npolling--;
886 dllist_remove(&loop->polling, &ctx->node);
887 ctx->polling = 0;
888 if (result == -1)
889 break;
890 } else if (!sllist_empty(&loop->queue)) {
891 continue;
892#if !LELY_NO_THREADS && !LELY_NO_TIMEOUT
893 } else if (abs_time) {
894 ctx->waiting = 1;
895 // Wake waiting threads in LIFO order.
896 dllist_push_front(&loop->waiting, &ctx->node);
897 int result = cnd_timedwait(
898 &ctx->cond, &loop->mtx, abs_time);
899 dllist_remove(&loop->waiting, &ctx->node);
900 ctx->waiting = 0;
901 if (result != thrd_success) {
902 if (result == thrd_timedout)
904 break;
905 }
906#endif // !LELY_NO_THREADS
907 } else {
908 break;
909 }
910 }
911 int empty = sllist_empty(&loop->queue);
912 if (poll_task)
913 // Requeue the polling task.
914 sllist_push_back(&loop->queue, &loop->task._node);
915 if (!empty)
916 // If any real tasks remain on the queue, wake up any polling or
917 // non-polling thread.
918 ev_loop_kill_any(loop, 1);
919 else if (poll_task)
920 // Wake up any non-polling thread so it can start polling.
921 ev_loop_kill_any(loop, 0);
922 if (!n && ctx && *ctx->pstopped)
923 // Reset the thread-local flag used to stop an event loop with
924 // ev_loop_kill(), so it will resume on the next run funciton.
925 *ctx->pstopped = 0;
926#if !LELY_NO_THREADS
927 mtx_unlock(&loop->mtx);
928#endif
929 return n;
930}
931
932static int
933ev_loop_ctx_kill(struct ev_loop_ctx *ctx, int stop)
934{
935 assert(ctx);
936 assert(ctx->loop);
937 assert(ctx->pstopped);
938 assert(!ctx->polling || ctx->loop->poll);
939
940 if (*ctx->pstopped)
941 return 0;
942 else if (stop)
943 *ctx->pstopped = 1;
944
945#if !LELY_NO_THREADS
946 if (ctx->waiting) {
947 cnd_signal(&ctx->cond);
948 return 0;
949 }
950#endif
951 return ctx->polling ? ev_poll_kill(ctx->loop->poll, ctx->thr) : 0;
952}
953
954static void
955ev_loop_std_exec_impl_on_task_init(ev_std_exec_impl_t *impl)
956{
957 ev_loop_t *loop = ev_loop_from_impl(impl);
958
959#if LELY_NO_THREADS || (LELY_NO_ATOMICS && (!_WIN32 || defined(__MINGW32__)))
960 loop->ntasks++;
961#elif _WIN64 && !defined(__MINGW32__)
962 InterlockedIncrementNoFence64(&loop->ntasks);
963#elif _WIN32 && !defined(__MINGW32__)
964 InterlockedIncrementNoFence(&loop->ntasks);
965#else
967#endif
968}
969
970static void
971ev_loop_std_exec_impl_on_task_fini(ev_std_exec_impl_t *impl)
972{
973 ev_loop_t *loop = ev_loop_from_impl(impl);
974
975#if LELY_NO_THREADS || (LELY_NO_ATOMICS && (!_WIN32 || defined(__MINGW32__)))
976 if (!--loop->ntasks) {
977#elif _WIN64 && !defined(__MINGW32__)
978 if (!InterlockedDecrementRelease64(&loop->ntasks)) {
979 MemoryBarrier();
980#elif _WIN32 && !defined(__MINGW32__)
981 if (!InterlockedDecrementRelease(&loop->ntasks)) {
982 MemoryBarrier();
983#else
985 == 1) {
987#endif
988#if !LELY_NO_THREADS
989 mtx_lock(&loop->mtx);
990#endif
991 if (ev_loop_empty(loop))
992 ev_loop_do_stop(loop);
993#if !LELY_NO_THREADS
994 mtx_unlock(&loop->mtx);
995#endif
996 }
997}
998
999static void
1000ev_loop_std_exec_impl_post(ev_std_exec_impl_t *impl, struct ev_task *task)
1001{
1002 ev_loop_t *loop = ev_loop_from_impl(impl);
1003 assert(task);
1004
1005#if !LELY_NO_THREADS
1006 mtx_lock(&loop->mtx);
1007#endif
1008 int empty = sllist_empty(&loop->queue);
1009 sllist_push_back(&loop->queue, &task->_node);
1010 if (empty)
1011 ev_loop_kill_any(loop, 1);
1012#if !LELY_NO_THREADS
1013 mtx_unlock(&loop->mtx);
1014#endif
1015}
1016
1017static size_t
1018ev_loop_std_exec_impl_abort(ev_std_exec_impl_t *impl, struct ev_task *task)
1019{
1020 ev_loop_t *loop = ev_loop_from_impl(impl);
1021
1022 struct sllist queue;
1023 sllist_init(&queue);
1024
1025#if !LELY_NO_THREADS
1026 mtx_lock(&loop->mtx);
1027#endif
1028 if (!task) {
1029 int poll_task = 0;
1030 struct slnode *node;
1031 while ((node = sllist_pop_front(&loop->queue))) {
1032 if (ev_task_from_node(node) == &loop->task)
1033 poll_task = 1;
1034 else
1035 sllist_push_back(&queue, node);
1036 }
1037 if (poll_task)
1038 sllist_push_back(&loop->queue, &loop->task._node);
1039 } else if (sllist_remove(&loop->queue, &task->_node)) {
1040 sllist_push_back(&queue, &task->_node);
1041 }
1042#if !LELY_NO_THREADS
1043 mtx_unlock(&loop->mtx);
1044#endif
1045
1046 size_t n = 0;
1047 while (sllist_pop_front(&queue))
1048 n += n < SIZE_MAX;
1049 return n;
1050}
1051
1052static inline ev_loop_t *
1053ev_loop_from_impl(const ev_std_exec_impl_t *impl)
1054{
1055 assert(impl);
1056
1057 return structof(impl, ev_loop_t, impl_vptr);
1058}
1059
1060static int
1061ev_loop_empty(const ev_loop_t *loop)
1062{
1063 assert(loop);
1064
1065 if (sllist_empty(&loop->queue))
1066 return 1;
1067 struct slnode *node = sllist_first(&loop->queue);
1068 // The task queue is considered empty if only the polling task remains.
1069 return node == &loop->task._node && node->next == NULL;
1070}
1071
1072static size_t
1073ev_loop_ntasks(const ev_loop_t *loop)
1074{
1075 assert(loop);
1076
1077#if LELY_NO_THREADS || LELY_NO_ATOMICS || (_WIN32 && !defined(__MINGW32__))
1078 return loop->ntasks;
1079#else
1080 return atomic_load_explicit(
1081 (atomic_size_t *)&loop->ntasks, memory_order_relaxed);
1082#endif
1083}
1084
1085static void
1086ev_loop_do_stop(ev_loop_t *loop)
1087{
1088 assert(loop);
1089
1090 if (!loop->stopped) {
1091 loop->stopped = 1;
1092#if !LELY_NO_THREADS
1093 dllist_foreach (&loop->waiting, node) {
1094 struct ev_loop_ctx *ctx = structof(
1095 node, struct ev_loop_ctx, node);
1096 ev_loop_ctx_kill(ctx, 1);
1097 }
1098#endif
1100 struct ev_loop_ctx *ctx = structof(
1101 node, struct ev_loop_ctx, node);
1102 ev_loop_ctx_kill(ctx, 1);
1103 }
1104 }
1105}
1106
1107static int
1108ev_loop_kill_any(ev_loop_t *loop, int polling)
1109{
1110 assert(loop);
1111
1112 struct dlnode *node;
1113#if !LELY_NO_THREADS
1114 if ((node = dllist_first(&loop->waiting))) {
1115 struct ev_loop_ctx *ctx =
1116 structof(node, struct ev_loop_ctx, node);
1117 return ev_loop_ctx_kill(ctx, 0);
1118 }
1119#endif
1120 if (polling && (node = dllist_first(&loop->polling))) {
1121 struct ev_loop_ctx *ctx =
1122 structof(node, struct ev_loop_ctx, node);
1123 return ev_loop_ctx_kill(ctx, 0);
1124 }
1125 return 0;
1126}
1127
1128#endif // !LELY_NO_MALLOC
This header file is part of the utilities library; it contains the doubly-linked list declarations.
void dllist_init(struct dllist *list)
Initializes a doubly-linked list.
Definition: dllist.h:281
int dllist_empty(const struct dllist *list)
Returns 1 if the doubly-linked list is empty, and 0 if not.
Definition: dllist.h:290
#define dllist_foreach(list, node)
Iterates in order over each node in a doubly-linked list.
Definition: dllist.h:232
void dllist_push_front(struct dllist *list, struct dlnode *node)
Pushes a node to the front of a doubly-linked list.
Definition: dllist.h:309
struct dlnode * dllist_first(const struct dllist *list)
Returns a pointer to the first node in a doubly-linked list.
Definition: dllist.h:420
void dlnode_init(struct dlnode *node)
Initializes a node in a doubly-linked list.
Definition: dllist.h:235
void dllist_remove(struct dllist *list, struct dlnode *node)
Removes a node from a doubly-linked list.
Definition: dllist.h:387
This header file is part of the utilities library; it contains the native and platform-independent er...
@ ERRNUM_TIMEDOUT
Connection timed out.
Definition: errnum.h:229
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition: errnum.c:932
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:944
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:46
void set_errnum(errnum_t errnum)
Sets the current (thread-specific) platform-independent error number to errnum.
Definition: errnum.h:424
void * ev_poll_self(const ev_poll_t *poll)
Returns the identifier of the calling thread.
Definition: poll.h:69
const struct ev_poll_vtbl *const ev_poll_t
The abstract polling interface.
Definition: poll.h:32
int ev_poll_kill(ev_poll_t *poll, void *thr)
Interrupts a polling wait on the specified thread.
Definition: poll.h:81
int ev_poll_wait(ev_poll_t *poll, int timeout)
Waits for at most timeout milliseconds while polling for new events.
Definition: poll.h:75
This header file is part of the event library; it contains the abstract task executor interface.
void ev_exec_run(ev_exec_t *exec, struct ev_task *task)
Invokes the task function in *task as if the task is being executed by *exec.
Definition: exec.h:142
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition: features.h:249
size_t ev_future_cancel(ev_future_t *future, struct ev_task *task)
Cancels the specified task submitted with ev_future_submit(), if it has not yet been scheduled for ex...
Definition: future.c:391
void ev_future_submit(ev_future_t *future, struct ev_task *task)
Submits a task to be executed once the specified future is ready.
Definition: future.c:364
void ev_future_release(ev_future_t *future)
Releases a reference to a future.
Definition: future.c:328
ev_future_t * ev_future_acquire(ev_future_t *future)
Acquires a reference to a future.
Definition: future.c:320
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
int timespec_get(struct timespec *ts, int base)
Sets the interval at ts to hold the current calendar time based on the specified time base.
Definition: time.c:32
#define TIME_UTC
An integer constant greater than 0 that designates the UTC time base.
Definition: time.h:207
ev_loop_t * ev_loop_create(ev_poll_t *poll, size_t npoll, int poll_task)
Creates a new polling event loop.
Definition: loop.c:291
size_t ev_loop_wait_one(ev_loop_t *loop, ev_future_t *future)
If the event loop has pending tasks, runs a single task.
Definition: loop.c:409
void ev_loop_destroy(ev_loop_t *loop)
Destroys a polling event loop.
Definition: loop.c:318
void * ev_loop_self(void)
Returns the identifier of the calling thread.
Definition: loop.c:428
#define LELY_EV_LOOP_CTX_MAX_UNUSED
The maximum number of unused contexts per event loop.
Definition: loop.c:48
int ev_loop_kill(ev_loop_t *loop, void *thr_)
Interrupts an event loop running on the specified thread.
Definition: loop.c:434
ev_poll_t * ev_loop_get_poll(const ev_loop_t *loop)
Returns a pointer to the polling instance used by the event loop, or NULL if the loop does not poll.
Definition: loop.c:327
ev_exec_t * ev_loop_get_exec(const ev_loop_t *loop)
Returns a pointer to the executor corresponding to the event loop.
Definition: loop.c:335
void ev_loop_stop(ev_loop_t *loop)
Stops the event loop.
Definition: loop.c:343
size_t ev_loop_wait(ev_loop_t *loop, ev_future_t *future)
Equivalent to.
Definition: loop.c:386
size_t ev_loop_wait_one_until(ev_loop_t *loop, ev_future_t *future, const struct timespec *abs_time)
If the event loop has pending tasks, runs a single task.
Definition: loop.c:418
size_t ev_loop_wait_until(ev_loop_t *loop, ev_future_t *future, const struct timespec *abs_time)
Equivalent to.
Definition: loop.c:397
int ev_loop_stopped(const ev_loop_t *loop)
Returns 1 if the event loop is stopped, and 0 if not.
Definition: loop.c:357
void ev_loop_restart(ev_loop_t *loop)
Restarts an event loop.
Definition: loop.c:372
This header file is part of the event library; it contains the polling event loop declarations.
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:194
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:232
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:202
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:243
struct slnode * sllist_first(const struct sllist *list)
Returns a pointer to the first node in a singly-linked list.
Definition: sllist.h:271
This is the internal header file of the event library.
This header file is part of the event library; it contains the standard executor declarations.
This header file is part of the C11 and POSIX compatibility library; it includes <stdatomic....
@ memory_order_release
A store operation performs a release operation on the affected memory location.
Definition: stdatomic.h:158
@ memory_order_relaxed
No operation orders memory.
Definition: stdatomic.h:131
@ memory_order_acquire
A load operation performs an acquire operation on the affected memory location.
Definition: stdatomic.h:149
#define atomic_fetch_add_explicit(object, operand, order)
Atomically replaces the value at object with *object + operand.
Definition: stdatomic.h:480
#define atomic_load_explicit(object, order)
Atomically returns the value at object.
Definition: stdatomic.h:344
void atomic_thread_fence(memory_order order)
Inserts a fence with semantics according to order.
Definition: stdatomic.h:617
#define atomic_init(obj, value)
Initializes the atomic object at obj with the value value.
Definition: stdatomic.h:222
#define atomic_fetch_sub_explicit(object, operand, order)
Atomically replaces the value at object with *object - operand.
Definition: stdatomic.h:504
This header file is part of the C11 and POSIX compatibility library; it includes <stdint....
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
A doubly-linked list.
Definition: dllist.h:54
A node in a doubly-linked list.
Definition: dllist.h:40
A future.
Definition: future.c:66
An event loop context.
Definition: loop.c:52
unsigned polling
A flag indicating if a thread is polling.
Definition: loop.c:79
unsigned ready
A flag indicating if future is ready.
Definition: loop.c:77
struct ev_loop_ctx * next
A pointer to the next context in the list of running or unused contexts.
Definition: loop.c:88
struct dlnode node
The node of this context in the list of waiting or polling contexts.
Definition: loop.c:83
ev_future_t * future
The future on which the loop is waiting.
Definition: loop.c:61
struct ev_task task
The task to be executed once the future is ready.
Definition: loop.c:63
void * thr
The thread identifier of the polling instance.
Definition: loop.c:81
cnd_t cond
The condition variable used by threads to wait for a task to be submitted to the event loop or for th...
Definition: loop.c:72
ev_loop_t * loop
A pointer to the event loop managing this context.
Definition: loop.c:59
size_t refcnt
The number of references to this context.
Definition: loop.c:57
unsigned waiting
A flag indicating if a thread is waiting on cond.
Definition: loop.c:74
int * pstopped
The address of the stopped flag of the thread.
Definition: loop.c:65
An event loop thread.
Definition: loop.c:111
struct ev_loop_ctx * ctx
A pointer to the event loop context for this thread.
Definition: loop.c:115
int stopped
A flag used to interrupt the event loop on this thread.
Definition: loop.c:113
A polling event loop.
Definition: loop.c:141
size_t npoll
The number of threads allowed to poll simultaneously.
Definition: loop.c:148
struct ev_task task
The task used to trigger polling.
Definition: loop.c:163
const struct ev_std_exec_impl_vtbl * impl_vptr
A pointer to the virtual table containing the interface used by the standard executor (exec).
Definition: loop.c:153
struct dllist polling
The list of polling contexts.
Definition: loop.c:186
struct dllist waiting
The list of waiting contexts.
Definition: loop.c:183
struct ev_std_exec exec
The executor corresponding to the event loop.
Definition: loop.c:155
struct ev_loop_ctx * unused
The list of unused contexts.
Definition: loop.c:190
atomic_size_t ntasks
The number of pending tasks.
Definition: loop.c:177
size_t nunused
The number of unused contexts.
Definition: loop.c:195
ev_poll_t * poll
A pointer to the interface used to poll for events (can be NULL).
Definition: loop.c:143
mtx_t mtx
The mutex protecting the task queue.
Definition: loop.c:158
int stopped
A flag specifying whether the event loop is stopped.
Definition: loop.c:180
size_t npolling
The number of polling contexts.
Definition: loop.c:188
struct sllist queue
The queue of pending tasks.
Definition: loop.c:161
An executable task.
Definition: task.h:41
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
A singly-linked list.
Definition: sllist.h:52
A node in a singly-linked list.
Definition: sllist.h:40
struct slnode * next
A pointer to the next node in the list.
Definition: sllist.h:42
This header file is part of the event library; it contains the task declarations.
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int cnd_init(cnd_t *cond)
Creates a condition variable.
int cnd_timedwait(cnd_t *cond, mtx_t *mtx, const struct timespec *ts)
Atomically unlocks the mutex at mtx and endeavors to block until the condition variable at cond is si...
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
pthread_cond_t cnd_t
A complete object type that holds an identifier for a condition variable.
Definition: threads.h:78
void cnd_destroy(cnd_t *cond)
Releases all resources used by the condition variable at cond.
int cnd_wait(cnd_t *cond, mtx_t *mtx)
Atomically unlocks the mutex at mtx and endeavors to block until the condition variable at cond is si...
@ thrd_timedout
Indicates that the time specified in the call was reached without acquiring the requested resource.
Definition: threads.h:128
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
int cnd_signal(cnd_t *cond)
Unblocks one of the threads that are blocked on the condition variable at cond at the time of the cal...
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109
This header file is part of the utilities library; it contains the time function declarations.
int timespec_cmp(const void *p1, const void *p2)
Compares two times.
Definition: time.h:251
int_least64_t timespec_diff_msec(const struct timespec *t1, const struct timespec *t2)
Returns the time difference (in milliseconds) between *t1 and *t2.
Definition: time.h:221