Lely core libraries  2.3.4
loop.c
Go to the documentation of this file.
1 
24 #include "ev.h"
25 
26 #if !LELY_NO_MALLOC
27 
28 #if !LELY_NO_THREADS
29 #include <lely/libc/stdatomic.h>
30 #include <lely/libc/threads.h>
31 #endif
32 #include <lely/ev/exec.h>
33 #define LELY_EV_LOOP_INLINE extern inline
34 #include <lely/ev/loop.h>
35 #include <lely/ev/std_exec.h>
36 #include <lely/ev/task.h>
37 #include <lely/util/dllist.h>
38 #include <lely/util/errnum.h>
39 #include <lely/util/time.h>
40 #include <lely/util/util.h>
41 
42 #include <assert.h>
43 #include <stdint.h>
44 #include <stdlib.h>
45 
46 #ifndef LELY_EV_LOOP_CTX_MAX_UNUSED
48 #define LELY_EV_LOOP_CTX_MAX_UNUSED 16
49 #endif
50 
52 struct ev_loop_ctx {
57  size_t refcnt;
63  struct ev_task task;
65  int *pstopped;
66 #if !LELY_NO_THREADS
74  unsigned waiting : 1;
75 #endif
77  unsigned ready : 1;
79  unsigned polling : 1;
81  void *thr;
83  struct dlnode node;
88  struct ev_loop_ctx *next;
89 };
90 
91 static void ev_loop_ctx_task_func(struct ev_task *task);
92 
93 static struct ev_loop_ctx *ev_loop_ctx_alloc(void);
94 static void ev_loop_ctx_free(struct ev_loop_ctx *ctx);
95 
96 static void ev_loop_ctx_release(struct ev_loop_ctx *ctx);
97 
98 static struct ev_loop_ctx *ev_loop_ctx_create(
100 static void ev_loop_ctx_destroy(struct ev_loop_ctx *ctx);
101 
102 static size_t ev_loop_ctx_wait_one(struct ev_loop_ctx **pctx, ev_loop_t *loop,
104 static size_t ev_loop_ctx_wait_one_until(struct ev_loop_ctx **pctx,
106  const struct timespec *abs_time);
107 
108 static int ev_loop_ctx_kill(struct ev_loop_ctx *ctx, int stop);
109 
111 struct ev_loop_thrd {
113  int stopped;
115  struct ev_loop_ctx *ctx;
116 };
117 
118 #if LELY_NO_THREADS
119 static struct ev_loop_thrd ev_loop_thrd = { 0, NULL };
120 #else
121 static _Thread_local struct ev_loop_thrd ev_loop_thrd = { 0, NULL };
122 #endif
123 
124 static void ev_loop_std_exec_impl_on_task_init(ev_std_exec_impl_t *impl);
125 static void ev_loop_std_exec_impl_on_task_fini(ev_std_exec_impl_t *impl);
126 static void ev_loop_std_exec_impl_post(
127  ev_std_exec_impl_t *impl, struct ev_task *task);
128 static size_t ev_loop_std_exec_impl_abort(
129  ev_std_exec_impl_t *impl, struct ev_task *task);
130 
131 // clang-format off
132 static const struct ev_std_exec_impl_vtbl ev_loop_std_exec_impl_vtbl = {
133  &ev_loop_std_exec_impl_on_task_init,
134  &ev_loop_std_exec_impl_on_task_fini,
135  &ev_loop_std_exec_impl_post,
136  &ev_loop_std_exec_impl_abort
137 };
138 // clang-format on
139 
141 struct ev_loop {
148  size_t npoll;
155  struct ev_std_exec exec;
156 #if !LELY_NO_THREADS
159 #endif
161  struct sllist queue;
163  struct ev_task task;
170 #if LELY_NO_THREADS || (LELY_NO_ATOMICS && (!_WIN32 || defined(__MINGW32__)))
171  size_t ntasks;
172 #elif _WIN64 && !defined(__MINGW32__)
173  volatile LONGLONG ntasks;
174 #elif _WIN32 && !defined(__MINGW32__)
175  volatile LONG ntasks;
176 #else
177  atomic_size_t ntasks;
178 #endif
180  int stopped;
181 #if !LELY_NO_THREADS
183  struct dllist waiting;
184 #endif
186  struct dllist polling;
188  size_t npolling;
195  size_t nunused;
196 };
197 
198 static inline ev_loop_t *ev_loop_from_impl(const ev_std_exec_impl_t *impl);
199 
200 static int ev_loop_empty(const ev_loop_t *loop);
201 static size_t ev_loop_ntasks(const ev_loop_t *loop);
202 
203 static void ev_loop_do_stop(ev_loop_t *loop);
204 
205 static int ev_loop_kill_any(ev_loop_t *loop, int polling);
206 
207 void *
208 ev_loop_alloc(void)
209 {
210  void *ptr = malloc(sizeof(ev_loop_t));
211 #if !LELY_NO_ERRNO
212  if (!ptr)
213  set_errc(errno2c(errno));
214 #endif
215  return ptr;
216 }
217 
218 void
219 ev_loop_free(void *ptr)
220 {
221  free(ptr);
222 }
223 
224 ev_loop_t *
225 ev_loop_init(ev_loop_t *loop, ev_poll_t *poll, size_t npoll, int poll_task)
226 {
227  assert(loop);
228 
229  loop->poll = poll;
230  loop->npoll = npoll;
231 
232  loop->impl_vptr = &ev_loop_std_exec_impl_vtbl;
233  ev_std_exec_init(ev_loop_get_exec(loop), &loop->impl_vptr);
234 
235 #if !LELY_NO_THREADS
237  return NULL;
238 #endif
239 
241 
242  loop->task = (struct ev_task)EV_TASK_INIT(NULL, NULL);
243  if (loop->poll && poll_task)
244  sllist_push_back(&loop->queue, &loop->task._node);
245 
246 #if LELY_NO_THREADS || LELY_NO_ATOMICS || (_WIN32 && !defined(__MINGW32__))
247  loop->ntasks = 0;
248 #else
249  atomic_init(&loop->ntasks, 0);
250 #endif
251 
252  loop->stopped = 0;
253 
254 #if !LELY_NO_THREADS
255  dllist_init(&loop->waiting);
256 #endif
257  dllist_init(&loop->polling);
258  loop->npolling = 0;
259 
260  loop->unused = NULL;
261  loop->nunused = 0;
262 
263  return loop;
264 }
265 
266 void
267 ev_loop_fini(ev_loop_t *loop)
268 {
269  assert(loop);
270 
271  while (loop->unused) {
272  struct ev_loop_ctx *ctx = loop->unused;
273  loop->unused = ctx->next;
274  ev_loop_ctx_free(ctx);
275  }
276 
277  assert(!loop->npolling);
278  assert(dllist_empty(&loop->polling));
279 #if !LELY_NO_THREADS
280  assert(dllist_empty(&loop->waiting));
281 #endif
282 
283 #if !LELY_NO_THREADS
284  mtx_destroy(&loop->mtx);
285 #endif
286 
287  ev_std_exec_fini(ev_loop_get_exec(loop));
288 }
289 
290 ev_loop_t *
291 ev_loop_create(ev_poll_t *poll, size_t npoll, int poll_task)
292 {
293  int errc = 0;
294 
295  ev_loop_t *loop = ev_loop_alloc();
296  if (!loop) {
297  errc = get_errc();
298  goto error_alloc;
299  }
300 
301  ev_loop_t *tmp = ev_loop_init(loop, poll, npoll, poll_task);
302  if (!tmp) {
303  errc = get_errc();
304  goto error_init;
305  }
306  loop = tmp;
307 
308  return loop;
309 
310 error_init:
311  ev_loop_free(loop);
312 error_alloc:
313  set_errc(errc);
314  return NULL;
315 }
316 
317 void
319 {
320  if (loop) {
321  ev_loop_fini(loop);
322  ev_loop_free(loop);
323  }
324 }
325 
326 ev_poll_t *
328 {
329  assert(loop);
330 
331  return loop->poll;
332 }
333 
334 ev_exec_t *
336 {
337  assert(loop);
338 
339  return &loop->exec.exec_vptr;
340 }
341 
342 void
344 {
345  assert(loop);
346 
347 #if !LELY_NO_THREADS
348  mtx_lock(&loop->mtx);
349 #endif
350  ev_loop_do_stop(loop);
351 #if !LELY_NO_THREADS
352  mtx_unlock(&loop->mtx);
353 #endif
354 }
355 
356 int
358 {
359  assert(loop);
360 
361 #if !LELY_NO_THREADS
362  mtx_lock((mtx_t *)&loop->mtx);
363 #endif
364  int stopped = loop->stopped;
365 #if !LELY_NO_THREADS
366  mtx_unlock((mtx_t *)&loop->mtx);
367 #endif
368  return stopped;
369 }
370 
371 void
373 {
374  assert(loop);
375 
376 #if !LELY_NO_THREADS
377  mtx_lock(&loop->mtx);
378 #endif
379  loop->stopped = 0;
380 #if !LELY_NO_THREADS
381  mtx_unlock(&loop->mtx);
382 #endif
383 }
384 
385 size_t
387 {
388  size_t n = 0;
389  struct ev_loop_ctx *ctx = NULL;
390  while (ev_loop_ctx_wait_one(&ctx, loop, future))
391  n += n < SIZE_MAX;
392  ev_loop_ctx_destroy(ctx);
393  return n;
394 }
395 
396 size_t
398  const struct timespec *abs_time)
399 {
400  size_t n = 0;
401  struct ev_loop_ctx *ctx = NULL;
402  while (ev_loop_ctx_wait_one_until(&ctx, loop, future, abs_time))
403  n += n < SIZE_MAX;
404  ev_loop_ctx_destroy(ctx);
405  return n;
406 }
407 
408 size_t
410 {
411  struct ev_loop_ctx *ctx = NULL;
412  size_t n = ev_loop_ctx_wait_one(&ctx, loop, future);
413  ev_loop_ctx_destroy(ctx);
414  return n;
415 }
416 
417 size_t
419  const struct timespec *abs_time)
420 {
421  struct ev_loop_ctx *ctx = NULL;
422  size_t n = ev_loop_ctx_wait_one_until(&ctx, loop, future, abs_time);
423  ev_loop_ctx_destroy(ctx);
424  return n;
425 }
426 
427 void *
429 {
430  return &ev_loop_thrd;
431 }
432 
433 int
435 {
436 #if LELY_NO_THREADS
437  (void)loop;
438 #else
439  assert(loop);
440 #endif
441  struct ev_loop_thrd *thr = thr_;
442  assert(thr);
443 
444  int result = 0;
445  int errc = get_errc();
446 #if !LELY_NO_THREADS
447  mtx_lock(&loop->mtx);
448 #endif
449  if (!thr->stopped) {
450  if (thr->ctx) {
451  if ((result = ev_loop_ctx_kill(thr->ctx, 1)) == -1)
452  errc = get_errc();
453  } else {
454  thr->stopped = 1;
455  }
456  }
457 #if !LELY_NO_THREADS
458  mtx_unlock(&loop->mtx);
459 #endif
460  set_errc(errc);
461  return result;
462 }
463 
464 static void
465 ev_loop_ctx_task_func(struct ev_task *task)
466 {
467  assert(task);
468  struct ev_loop_ctx *ctx = structof(task, struct ev_loop_ctx, task);
469  ev_loop_t *loop = ctx->loop;
470  assert(loop);
471  assert(ctx->future);
472 
473 #if LELY_NO_THREADS
474  (void)loop;
475 #else
476  mtx_lock(&loop->mtx);
477 #endif
478  if (ctx->refcnt > 1 && !*ctx->pstopped && !ctx->ready) {
479  ctx->ready = 1;
480  ev_loop_ctx_kill(ctx, 0);
481  }
482 #if !LELY_NO_THREADS
483  mtx_unlock(&loop->mtx);
484 #endif
485  ev_loop_ctx_release(ctx);
486 }
487 
488 static struct ev_loop_ctx *
489 ev_loop_ctx_alloc(void)
490 {
491  int errc = 0;
492 
493  struct ev_loop_ctx *ctx = malloc(sizeof(*ctx));
494  if (!ctx) {
495 #if !LELY_NO_ERRNO
496  errc = errno2c(errno);
497 #endif
498  goto error_malloc_ctx;
499  }
500 
501  ctx->refcnt = 0;
502 
503  ctx->loop = NULL;
504 
505  ctx->future = NULL;
506  ctx->task = (struct ev_task)EV_TASK_INIT(NULL, &ev_loop_ctx_task_func);
507 
508  ctx->pstopped = NULL;
509 #if !LELY_NO_THREADS
510  if (cnd_init(&ctx->cond) != thrd_success) {
511  errc = get_errc();
512  goto error_init_cond;
513  }
514  ctx->waiting = 0;
515 #endif
516  ctx->ready = 0;
517  ctx->polling = 0;
518  ctx->thr = NULL;
519 
520  dlnode_init(&ctx->node);
521  ctx->next = NULL;
522 
523  return ctx;
524 
525 #if !LELY_NO_THREADS
526  // cnd_destroy(&ctx->cond);
527 error_init_cond:
528 #endif
529  free(ctx);
530 error_malloc_ctx:
531  set_errc(errc);
532  return NULL;
533 }
534 
535 static void
536 ev_loop_ctx_free(struct ev_loop_ctx *ctx)
537 {
538  if (ctx) {
539 #if !LELY_NO_THREADS
540  cnd_destroy(&ctx->cond);
541 #endif
542  free(ctx);
543  }
544 }
545 
546 static void
547 ev_loop_ctx_release(struct ev_loop_ctx *ctx)
548 {
549  assert(ctx);
550  ev_loop_t *loop = ctx->loop;
551  assert(loop);
552 
553 #if !LELY_NO_THREADS
554  mtx_lock(&loop->mtx);
555 #endif
556  if (--ctx->refcnt) {
557 #if !LELY_NO_THREADS
558  mtx_unlock(&loop->mtx);
559 #endif
560  return;
561  }
562  ev_future_t *future = ctx->future;
563  ctx->future = NULL;
564 #if !LELY_NO_THREADS
565  assert(!ctx->waiting);
566 #endif
567  assert(!ctx->polling);
568  if (loop->nunused < LELY_EV_LOOP_CTX_MAX_UNUSED) {
569  ctx->next = loop->unused;
570  loop->unused = ctx;
571  loop->nunused++;
572 #if !LELY_NO_THREADS
573  mtx_unlock(&loop->mtx);
574 #endif
575  ev_future_release(future);
576  } else {
577 #if !LELY_NO_THREADS
578  mtx_unlock(&loop->mtx);
579 #endif
580  ev_future_release(future);
581  ev_loop_ctx_free(ctx);
582  }
583 }
584 
585 static struct ev_loop_ctx *
586 ev_loop_ctx_create(ev_loop_t *loop, ev_future_t *future)
587 {
588  assert(loop);
589 
590 #if !LELY_NO_THREADS
591  mtx_lock(&loop->mtx);
592 #endif
593  struct ev_loop_ctx *ctx = loop->unused;
594  if (ctx) {
595  loop->unused = ctx->next;
596  assert(loop->nunused);
597  loop->nunused--;
598 #if !LELY_NO_THREADS
599  mtx_unlock(&loop->mtx);
600 #endif
601  } else {
602 #if !LELY_NO_THREADS
603  mtx_unlock(&loop->mtx);
604 #endif
605  ctx = ev_loop_ctx_alloc();
606  if (!ctx)
607  return NULL;
608  }
609 
610  assert(!ctx->refcnt);
611  ctx->refcnt++;
612 
613  ctx->loop = loop;
614 
616  ctx->task = (struct ev_task)EV_TASK_INIT(
617  ev_loop_get_exec(loop), &ev_loop_ctx_task_func);
618 
619  ctx->pstopped = &ev_loop_thrd.stopped;
620 
621 #if !LELY_NO_THREADS
622  assert(!ctx->waiting);
623 #endif
624  ctx->ready = 0;
625  assert(!ctx->polling);
626  ctx->thr = loop->poll ? ev_poll_self(loop->poll) : NULL;
627 
628  dlnode_init(&ctx->node);
629 
630  ctx->next = ev_loop_thrd.ctx;
631  ev_loop_thrd.ctx = ctx;
632 
633  if (ctx->future) {
634  ctx->refcnt++;
635  ev_future_submit(ctx->future, &ctx->task);
636  }
637 
638  return ctx;
639 }
640 
641 static void
642 ev_loop_ctx_destroy(struct ev_loop_ctx *ctx)
643 {
644  if (ctx) {
645  if (ctx->future)
646  ev_future_cancel(ctx->future, &ctx->task);
647 
648  assert(ev_loop_thrd.ctx == ctx);
649  ev_loop_thrd.ctx = ctx->next;
650 
651  ev_loop_ctx_release(ctx);
652  }
653 }
654 
655 static inline int
656 ev_loop_can_poll(ev_loop_t *loop)
657 {
658  return loop->poll && (!loop->npoll || loop->npolling < loop->npoll);
659 }
660 
661 static size_t
662 ev_loop_ctx_wait_one(
663  struct ev_loop_ctx **pctx, ev_loop_t *loop, ev_future_t *future)
664 {
665  assert(pctx);
666  struct ev_loop_ctx *ctx = *pctx;
667  assert(loop);
668  assert(!ctx || ctx->loop == loop);
669 
670  size_t n = 0;
671 #if !LELY_NO_THREADS
672  mtx_lock(&loop->mtx);
673 #endif
674  int poll_task = 0;
675  while (!loop->stopped && (!ctx || (!*ctx->pstopped && !ctx->ready))) {
676  // Stop the event loop if no more tasks remain or have been
677  // announced with ev_exec_on_task_init(), unless we should be
678  // waiting on a future but have not created the event loop
679  // context yet.
680  if (ev_loop_empty(loop) && !ev_loop_ntasks(loop)
681  && (!future || ctx)) {
682  ev_loop_do_stop(loop);
683  continue;
684  }
685  struct ev_task *task = ev_task_from_node(
686  sllist_pop_front(&loop->queue));
687  if (task && task == &loop->task) {
688  // The polling task is not a real task, but is part of
689  // the task queue for scheduling purposes.
690  poll_task = 1;
691  task = NULL;
692  }
693  if (task) {
694  // If a real task is available, execute it and return.
695 #if !LELY_NO_THREADS
696  mtx_unlock(&loop->mtx);
697 #endif
698  assert(task->exec);
699  ev_exec_run(task->exec, task);
700  n++;
701 #if !LELY_NO_THREADS
702  mtx_lock(&loop->mtx);
703 #endif
704  break;
705  }
706  if (!ctx) {
707  // Only create an event loop context when we have to
708  // poll or wait.
709 #if !LELY_NO_THREADS
710  mtx_unlock(&loop->mtx);
711 #endif
712  ctx = *pctx = ev_loop_ctx_create(loop, future);
713  if (!ctx) {
714 #if !LELY_NO_THREADS
715  mtx_lock(&loop->mtx);
716 #endif
717  break;
718  }
719 #if !LELY_NO_THREADS
720  mtx_lock(&loop->mtx);
721 #endif
722  // We released the lock, so a task may have been queued.
723  continue;
724  }
725  if (ev_loop_can_poll(loop)) {
726  ctx->polling = 1;
727  // Wake polling threads in LIFO order.
728  dllist_push_front(&loop->polling, &ctx->node);
729  loop->npolling++;
730  int empty = sllist_empty(&loop->queue);
731 #if !LELY_NO_THREADS
732  mtx_unlock(&loop->mtx);
733 #endif
734  int result = ev_poll_wait(loop->poll, empty ? -1 : 0);
735 #if !LELY_NO_THREADS
736  mtx_lock(&loop->mtx);
737 #endif
738  loop->npolling--;
739  dllist_remove(&loop->polling, &ctx->node);
740  ctx->polling = 0;
741  if (result == -1)
742  break;
743  } else if (!sllist_empty(&loop->queue)) {
744  continue;
745  } else {
746 #if LELY_NO_THREADS
747  break;
748 #else // !LELY_NO_THREADS
749  ctx->waiting = 1;
750  // Wake waiting threads in LIFO order.
751  dllist_push_front(&loop->waiting, &ctx->node);
752  int result = cnd_wait(&ctx->cond, &loop->mtx);
753  dllist_remove(&loop->waiting, &ctx->node);
754  ctx->waiting = 0;
755  if (result != thrd_success)
756  break;
757 #endif // !LELY_NO_THREADS
758  }
759  }
760  int empty = sllist_empty(&loop->queue);
761  if (poll_task)
762  // Requeue the polling task.
763  sllist_push_back(&loop->queue, &loop->task._node);
764  if (!empty)
765  // If any real tasks remain on the queue, wake up any polling or
766  // non-polling thread.
767  ev_loop_kill_any(loop, 1);
768  else if (poll_task)
769  // Wake up any non-polling thread so it can start polling.
770  ev_loop_kill_any(loop, 0);
771  if (!n && ctx && *ctx->pstopped)
772  // Reset the thread-local flag used to stop an event loop with
773  // ev_loop_kill(), so it will resume on the next run funciton.
774  *ctx->pstopped = 0;
775 #if !LELY_NO_THREADS
776  mtx_unlock(&loop->mtx);
777 #endif
778  return n;
779 }
780 
781 static size_t
782 ev_loop_ctx_wait_one_until(struct ev_loop_ctx **pctx, ev_loop_t *loop,
783  ev_future_t *future, const struct timespec *abs_time)
784 {
785  assert(pctx);
786  struct ev_loop_ctx *ctx = *pctx;
787  assert(loop);
788  assert(!ctx || ctx->loop == loop);
789 #if LELY_NO_THREADS && LELY_NO_TIMEOUT
790  (void)abs_time;
791 #endif
792 
793  size_t n = 0;
794 #if !LELY_NO_THREADS
795  mtx_lock(&loop->mtx);
796 #endif
797  int poll_task = 0;
798  while (!loop->stopped && (!ctx || (!*ctx->pstopped && !ctx->ready))) {
799  // Stop the event loop if no more tasks remain or have been
800  // announced with ev_exec_on_task_init(), unless we should be
801  // waiting on a future but have not created the event loop
802  // context yet.
803  if (ev_loop_empty(loop) && !ev_loop_ntasks(loop)
804  && (!future || ctx)) {
805  ev_loop_do_stop(loop);
806  continue;
807  }
808  struct ev_task *task = ev_task_from_node(
809  sllist_pop_front(&loop->queue));
810  if (task && task == &loop->task) {
811  // The polling task is not a real task, but is part of
812  // the task queue for scheduling purposes.
813  poll_task = 1;
814  task = NULL;
815  }
816  if (task) {
817  // If a real task is available, execute it and return.
818 #if !LELY_NO_THREADS
819  mtx_unlock(&loop->mtx);
820 #endif
821  assert(task->exec);
822  ev_exec_run(task->exec, task);
823  n++;
824 #if !LELY_NO_THREADS
825  mtx_lock(&loop->mtx);
826 #endif
827  break;
828  }
829  if (!ctx) {
830  // Only create an event loop context when we have to
831  // poll or wait.
832 #if !LELY_NO_THREADS
833  mtx_unlock(&loop->mtx);
834 #endif
835  ctx = *pctx = ev_loop_ctx_create(loop, future);
836  if (!ctx) {
837 #if !LELY_NO_THREADS
838  mtx_lock(&loop->mtx);
839 #endif
840  break;
841  }
842 #if !LELY_NO_THREADS
843  mtx_lock(&loop->mtx);
844 #endif
845  // We released the lock, so a task may have been queued.
846  continue;
847  }
848  if (ev_loop_can_poll(loop)) {
849  ctx->polling = 1;
850  // Wake polling threads in LIFO order.
851  dllist_push_front(&loop->polling, &ctx->node);
852  loop->npolling++;
853 #if !LELY_NO_TIMEOUT
854  int empty = sllist_empty(&loop->queue);
855 #endif
856 #if !LELY_NO_THREADS
857  mtx_unlock(&loop->mtx);
858 #endif
859  int result = -1;
860  int64_t msec = 0;
861 #if !LELY_NO_TIMEOUT
862  if (empty && abs_time) {
863  struct timespec now = { 0, 0 };
864  if (!timespec_get(&now, TIME_UTC))
865  goto error;
866  if (timespec_cmp(abs_time, &now) <= 0) {
868  goto error;
869  }
870  msec = timespec_diff_msec(abs_time, &now);
871  if (!msec)
872  // Prevent a busy loop due to rounding.
873  msec = 1;
874  else if (msec > INT_MAX)
875  msec = INT_MAX;
876  }
877 #endif // !LELY_NO_TIMEOUT
878  result = ev_poll_wait(loop->poll, msec);
879 #if !LELY_NO_TIMEOUT
880  error:
881 #endif
882 #if !LELY_NO_THREADS
883  mtx_lock(&loop->mtx);
884 #endif
885  loop->npolling--;
886  dllist_remove(&loop->polling, &ctx->node);
887  ctx->polling = 0;
888  if (result == -1)
889  break;
890  } else if (!sllist_empty(&loop->queue)) {
891  continue;
892 #if !LELY_NO_THREADS && !LELY_NO_TIMEOUT
893  } else if (abs_time) {
894  ctx->waiting = 1;
895  // Wake waiting threads in LIFO order.
896  dllist_push_front(&loop->waiting, &ctx->node);
897  int result = cnd_timedwait(
898  &ctx->cond, &loop->mtx, abs_time);
899  dllist_remove(&loop->waiting, &ctx->node);
900  ctx->waiting = 0;
901  if (result != thrd_success) {
902  if (result == thrd_timedout)
904  break;
905  }
906 #endif // !LELY_NO_THREADS
907  } else {
908  break;
909  }
910  }
911  int empty = sllist_empty(&loop->queue);
912  if (poll_task)
913  // Requeue the polling task.
914  sllist_push_back(&loop->queue, &loop->task._node);
915  if (!empty)
916  // If any real tasks remain on the queue, wake up any polling or
917  // non-polling thread.
918  ev_loop_kill_any(loop, 1);
919  else if (poll_task)
920  // Wake up any non-polling thread so it can start polling.
921  ev_loop_kill_any(loop, 0);
922  if (!n && ctx && *ctx->pstopped)
923  // Reset the thread-local flag used to stop an event loop with
924  // ev_loop_kill(), so it will resume on the next run funciton.
925  *ctx->pstopped = 0;
926 #if !LELY_NO_THREADS
927  mtx_unlock(&loop->mtx);
928 #endif
929  return n;
930 }
931 
932 static int
933 ev_loop_ctx_kill(struct ev_loop_ctx *ctx, int stop)
934 {
935  assert(ctx);
936  assert(ctx->loop);
937  assert(ctx->pstopped);
938  assert(!ctx->polling || ctx->loop->poll);
939 
940  if (*ctx->pstopped)
941  return 0;
942  else if (stop)
943  *ctx->pstopped = 1;
944 
945 #if !LELY_NO_THREADS
946  if (ctx->waiting) {
947  cnd_signal(&ctx->cond);
948  return 0;
949  }
950 #endif
951  return ctx->polling ? ev_poll_kill(ctx->loop->poll, ctx->thr) : 0;
952 }
953 
954 static void
955 ev_loop_std_exec_impl_on_task_init(ev_std_exec_impl_t *impl)
956 {
957  ev_loop_t *loop = ev_loop_from_impl(impl);
958 
959 #if LELY_NO_THREADS || (LELY_NO_ATOMICS && (!_WIN32 || defined(__MINGW32__)))
960  loop->ntasks++;
961 #elif _WIN64 && !defined(__MINGW32__)
962  InterlockedIncrementNoFence64(&loop->ntasks);
963 #elif _WIN32 && !defined(__MINGW32__)
964  InterlockedIncrementNoFence(&loop->ntasks);
965 #else
967 #endif
968 }
969 
970 static void
971 ev_loop_std_exec_impl_on_task_fini(ev_std_exec_impl_t *impl)
972 {
973  ev_loop_t *loop = ev_loop_from_impl(impl);
974 
975 #if LELY_NO_THREADS || (LELY_NO_ATOMICS && (!_WIN32 || defined(__MINGW32__)))
976  if (!--loop->ntasks) {
977 #elif _WIN64 && !defined(__MINGW32__)
978  if (!InterlockedDecrementRelease64(&loop->ntasks)) {
979  MemoryBarrier();
980 #elif _WIN32 && !defined(__MINGW32__)
981  if (!InterlockedDecrementRelease(&loop->ntasks)) {
982  MemoryBarrier();
983 #else
985  == 1) {
987 #endif
988 #if !LELY_NO_THREADS
989  mtx_lock(&loop->mtx);
990 #endif
991  if (ev_loop_empty(loop))
992  ev_loop_do_stop(loop);
993 #if !LELY_NO_THREADS
994  mtx_unlock(&loop->mtx);
995 #endif
996  }
997 }
998 
999 static void
1000 ev_loop_std_exec_impl_post(ev_std_exec_impl_t *impl, struct ev_task *task)
1001 {
1002  ev_loop_t *loop = ev_loop_from_impl(impl);
1003  assert(task);
1004 
1005 #if !LELY_NO_THREADS
1006  mtx_lock(&loop->mtx);
1007 #endif
1008  int empty = sllist_empty(&loop->queue);
1009  sllist_push_back(&loop->queue, &task->_node);
1010  if (empty)
1011  ev_loop_kill_any(loop, 1);
1012 #if !LELY_NO_THREADS
1013  mtx_unlock(&loop->mtx);
1014 #endif
1015 }
1016 
1017 static size_t
1018 ev_loop_std_exec_impl_abort(ev_std_exec_impl_t *impl, struct ev_task *task)
1019 {
1020  ev_loop_t *loop = ev_loop_from_impl(impl);
1021 
1022  struct sllist queue;
1023  sllist_init(&queue);
1024 
1025 #if !LELY_NO_THREADS
1026  mtx_lock(&loop->mtx);
1027 #endif
1028  if (!task) {
1029  int poll_task = 0;
1030  struct slnode *node;
1031  while ((node = sllist_pop_front(&loop->queue))) {
1032  if (ev_task_from_node(node) == &loop->task)
1033  poll_task = 1;
1034  else
1035  sllist_push_back(&queue, node);
1036  }
1037  if (poll_task)
1038  sllist_push_back(&loop->queue, &loop->task._node);
1039  } else if (sllist_remove(&loop->queue, &task->_node)) {
1040  sllist_push_back(&queue, &task->_node);
1041  }
1042 #if !LELY_NO_THREADS
1043  mtx_unlock(&loop->mtx);
1044 #endif
1045 
1046  size_t n = 0;
1047  while (sllist_pop_front(&queue))
1048  n += n < SIZE_MAX;
1049  return n;
1050 }
1051 
1052 static inline ev_loop_t *
1053 ev_loop_from_impl(const ev_std_exec_impl_t *impl)
1054 {
1055  assert(impl);
1056 
1057  return structof(impl, ev_loop_t, impl_vptr);
1058 }
1059 
1060 static int
1061 ev_loop_empty(const ev_loop_t *loop)
1062 {
1063  assert(loop);
1064 
1065  if (sllist_empty(&loop->queue))
1066  return 1;
1067  struct slnode *node = sllist_first(&loop->queue);
1068  // The task queue is considered empty if only the polling task remains.
1069  return node == &loop->task._node && node->next == NULL;
1070 }
1071 
1072 static size_t
1073 ev_loop_ntasks(const ev_loop_t *loop)
1074 {
1075  assert(loop);
1076 
1077 #if LELY_NO_THREADS || LELY_NO_ATOMICS || (_WIN32 && !defined(__MINGW32__))
1078  return loop->ntasks;
1079 #else
1080  return atomic_load_explicit(
1081  (atomic_size_t *)&loop->ntasks, memory_order_relaxed);
1082 #endif
1083 }
1084 
1085 static void
1086 ev_loop_do_stop(ev_loop_t *loop)
1087 {
1088  assert(loop);
1089 
1090  if (!loop->stopped) {
1091  loop->stopped = 1;
1092 #if !LELY_NO_THREADS
1093  dllist_foreach (&loop->waiting, node) {
1094  struct ev_loop_ctx *ctx = structof(
1095  node, struct ev_loop_ctx, node);
1096  ev_loop_ctx_kill(ctx, 1);
1097  }
1098 #endif
1100  struct ev_loop_ctx *ctx = structof(
1101  node, struct ev_loop_ctx, node);
1102  ev_loop_ctx_kill(ctx, 1);
1103  }
1104  }
1105 }
1106 
1107 static int
1108 ev_loop_kill_any(ev_loop_t *loop, int polling)
1109 {
1110  assert(loop);
1111 
1112  struct dlnode *node;
1113 #if !LELY_NO_THREADS
1114  if ((node = dllist_first(&loop->waiting))) {
1115  struct ev_loop_ctx *ctx =
1116  structof(node, struct ev_loop_ctx, node);
1117  return ev_loop_ctx_kill(ctx, 0);
1118  }
1119 #endif
1120  if (polling && (node = dllist_first(&loop->polling))) {
1121  struct ev_loop_ctx *ctx =
1122  structof(node, struct ev_loop_ctx, node);
1123  return ev_loop_ctx_kill(ctx, 0);
1124  }
1125  return 0;
1126 }
1127 
1128 #endif // !LELY_NO_MALLOC
This header file is part of the utilities library; it contains the doubly-linked list declarations.
void dllist_init(struct dllist *list)
Initializes a doubly-linked list.
Definition: dllist.h:281
int dllist_empty(const struct dllist *list)
Returns 1 if the doubly-linked list is empty, and 0 if not.
Definition: dllist.h:290
#define dllist_foreach(list, node)
Iterates in order over each node in a doubly-linked list.
Definition: dllist.h:232
void dllist_push_front(struct dllist *list, struct dlnode *node)
Pushes a node to the front of a doubly-linked list.
Definition: dllist.h:309
struct dlnode * dllist_first(const struct dllist *list)
Returns a pointer to the first node in a doubly-linked list.
Definition: dllist.h:420
void dlnode_init(struct dlnode *node)
Initializes a node in a doubly-linked list.
Definition: dllist.h:235
void dllist_remove(struct dllist *list, struct dlnode *node)
Removes a node from a doubly-linked list.
Definition: dllist.h:387
This header file is part of the utilities library; it contains the native and platform-independent er...
@ ERRNUM_TIMEDOUT
Connection timed out.
Definition: errnum.h:229
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition: errnum.c:932
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:944
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:46
void set_errnum(errnum_t errnum)
Sets the current (thread-specific) platform-independent error number to errnum.
Definition: errnum.h:424
void * ev_poll_self(const ev_poll_t *poll)
Returns the identifier of the calling thread.
Definition: poll.h:69
const struct ev_poll_vtbl *const ev_poll_t
The abstract polling interface.
Definition: poll.h:32
int ev_poll_kill(ev_poll_t *poll, void *thr)
Interrupts a polling wait on the specified thread.
Definition: poll.h:81
int ev_poll_wait(ev_poll_t *poll, int timeout)
Waits for at most timeout milliseconds while polling for new events.
Definition: poll.h:75
This header file is part of the event library; it contains the abstract task executor interface.
void ev_exec_run(ev_exec_t *exec, struct ev_task *task)
Invokes the task function in *task as if the task is being executed by *exec.
Definition: exec.h:142
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition: features.h:249
size_t ev_future_cancel(ev_future_t *future, struct ev_task *task)
Cancels the specified task submitted with ev_future_submit(), if it has not yet been scheduled for ex...
Definition: future.c:391
void ev_future_submit(ev_future_t *future, struct ev_task *task)
Submits a task to be executed once the specified future is ready.
Definition: future.c:364
ev_future_t * ev_future_acquire(ev_future_t *future)
Acquires a reference to a future.
Definition: future.c:320
void ev_future_release(ev_future_t *future)
Releases a reference to a future.
Definition: future.c:328
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
int timespec_get(struct timespec *ts, int base)
Sets the interval at ts to hold the current calendar time based on the specified time base.
Definition: time.c:32
#define TIME_UTC
An integer constant greater than 0 that designates the UTC time base.
Definition: time.h:207
ev_exec_t * ev_loop_get_exec(const ev_loop_t *loop)
Returns a pointer to the executor corresponding to the event loop.
Definition: loop.c:335
size_t ev_loop_wait_one(ev_loop_t *loop, ev_future_t *future)
If the event loop has pending tasks, runs a single task.
Definition: loop.c:409
void ev_loop_destroy(ev_loop_t *loop)
Destroys a polling event loop.
Definition: loop.c:318
ev_poll_t * ev_loop_get_poll(const ev_loop_t *loop)
Returns a pointer to the polling instance used by the event loop, or NULL if the loop does not poll.
Definition: loop.c:327
#define LELY_EV_LOOP_CTX_MAX_UNUSED
The maximum number of unused contexts per event loop.
Definition: loop.c:48
ev_loop_t * ev_loop_create(ev_poll_t *poll, size_t npoll, int poll_task)
Creates a new polling event loop.
Definition: loop.c:291
void * ev_loop_self(void)
Returns the identifier of the calling thread.
Definition: loop.c:428
int ev_loop_kill(ev_loop_t *loop, void *thr_)
Interrupts an event loop running on the specified thread.
Definition: loop.c:434
void ev_loop_stop(ev_loop_t *loop)
Stops the event loop.
Definition: loop.c:343
size_t ev_loop_wait(ev_loop_t *loop, ev_future_t *future)
Equivalent to.
Definition: loop.c:386
size_t ev_loop_wait_one_until(ev_loop_t *loop, ev_future_t *future, const struct timespec *abs_time)
If the event loop has pending tasks, runs a single task.
Definition: loop.c:418
size_t ev_loop_wait_until(ev_loop_t *loop, ev_future_t *future, const struct timespec *abs_time)
Equivalent to.
Definition: loop.c:397
int ev_loop_stopped(const ev_loop_t *loop)
Returns 1 if the event loop is stopped, and 0 if not.
Definition: loop.c:357
void ev_loop_restart(ev_loop_t *loop)
Restarts an event loop.
Definition: loop.c:372
This header file is part of the event library; it contains the polling event loop declarations.
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:194
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:232
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:202
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:243
struct slnode * sllist_first(const struct sllist *list)
Returns a pointer to the first node in a singly-linked list.
Definition: sllist.h:271
This is the internal header file of the event library.
This header file is part of the event library; it contains the standard executor declarations.
This header file is part of the C11 and POSIX compatibility library; it includes <stdatomic....
@ memory_order_release
A store operation performs a release operation on the affected memory location.
Definition: stdatomic.h:158
@ memory_order_relaxed
No operation orders memory.
Definition: stdatomic.h:131
@ memory_order_acquire
A load operation performs an acquire operation on the affected memory location.
Definition: stdatomic.h:149
#define atomic_fetch_add_explicit(object, operand, order)
Atomically replaces the value at object with *object + operand.
Definition: stdatomic.h:480
#define atomic_load_explicit(object, order)
Atomically returns the value at object.
Definition: stdatomic.h:344
void atomic_thread_fence(memory_order order)
Inserts a fence with semantics according to order.
Definition: stdatomic.h:617
#define atomic_init(obj, value)
Initializes the atomic object at obj with the value value.
Definition: stdatomic.h:222
#define atomic_fetch_sub_explicit(object, operand, order)
Atomically replaces the value at object with *object - operand.
Definition: stdatomic.h:504
This header file is part of the C11 and POSIX compatibility library; it includes <stdint....
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
A doubly-linked list.
Definition: dllist.h:54
A node in a doubly-linked list.
Definition: dllist.h:40
A future.
Definition: future.c:66
An event loop context.
Definition: loop.c:52
unsigned polling
A flag indicating if a thread is polling.
Definition: loop.c:79
unsigned ready
A flag indicating if future is ready.
Definition: loop.c:77
struct ev_loop_ctx * next
A pointer to the next context in the list of running or unused contexts.
Definition: loop.c:88
struct dlnode node
The node of this context in the list of waiting or polling contexts.
Definition: loop.c:83
ev_future_t * future
The future on which the loop is waiting.
Definition: loop.c:61
struct ev_task task
The task to be executed once the future is ready.
Definition: loop.c:63
void * thr
The thread identifier of the polling instance.
Definition: loop.c:81
cnd_t cond
The condition variable used by threads to wait for a task to be submitted to the event loop or for th...
Definition: loop.c:72
ev_loop_t * loop
A pointer to the event loop managing this context.
Definition: loop.c:59
size_t refcnt
The number of references to this context.
Definition: loop.c:57
unsigned waiting
A flag indicating if a thread is waiting on cond.
Definition: loop.c:74
int * pstopped
The address of the stopped flag of the thread.
Definition: loop.c:65
An event loop thread.
Definition: loop.c:111
struct ev_loop_ctx * ctx
A pointer to the event loop context for this thread.
Definition: loop.c:115
int stopped
A flag used to interrupt the event loop on this thread.
Definition: loop.c:113
A polling event loop.
Definition: loop.c:141
size_t npoll
The number of threads allowed to poll simultaneously.
Definition: loop.c:148
struct ev_task task
The task used to trigger polling.
Definition: loop.c:163
const struct ev_std_exec_impl_vtbl * impl_vptr
A pointer to the virtual table containing the interface used by the standard executor (exec).
Definition: loop.c:153
struct dllist polling
The list of polling contexts.
Definition: loop.c:186
struct dllist waiting
The list of waiting contexts.
Definition: loop.c:183
struct ev_std_exec exec
The executor corresponding to the event loop.
Definition: loop.c:155
struct ev_loop_ctx * unused
The list of unused contexts.
Definition: loop.c:190
atomic_size_t ntasks
The number of pending tasks.
Definition: loop.c:177
size_t nunused
The number of unused contexts.
Definition: loop.c:195
ev_poll_t * poll
A pointer to the interface used to poll for events (can be NULL).
Definition: loop.c:143
mtx_t mtx
The mutex protecting the task queue.
Definition: loop.c:158
int stopped
A flag specifying whether the event loop is stopped.
Definition: loop.c:180
size_t npolling
The number of polling contexts.
Definition: loop.c:188
struct sllist queue
The queue of pending tasks.
Definition: loop.c:161
An executable task.
Definition: task.h:41
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
A singly-linked list.
Definition: sllist.h:52
A node in a singly-linked list.
Definition: sllist.h:40
struct slnode * next
A pointer to the next node in the list.
Definition: sllist.h:42
This header file is part of the event library; it contains the task declarations.
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int cnd_init(cnd_t *cond)
Creates a condition variable.
int cnd_timedwait(cnd_t *cond, mtx_t *mtx, const struct timespec *ts)
Atomically unlocks the mutex at mtx and endeavors to block until the condition variable at cond is si...
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
pthread_cond_t cnd_t
A complete object type that holds an identifier for a condition variable.
Definition: threads.h:78
void cnd_destroy(cnd_t *cond)
Releases all resources used by the condition variable at cond.
int cnd_wait(cnd_t *cond, mtx_t *mtx)
Atomically unlocks the mutex at mtx and endeavors to block until the condition variable at cond is si...
@ thrd_timedout
Indicates that the time specified in the call was reached without acquiring the requested resource.
Definition: threads.h:128
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
int cnd_signal(cnd_t *cond)
Unblocks one of the threads that are blocked on the condition variable at cond at the time of the cal...
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109
This header file is part of the utilities library; it contains the time function declarations.
int timespec_cmp(const void *p1, const void *p2)
Compares two times.
Definition: time.h:251
int_least64_t timespec_diff_msec(const struct timespec *t1, const struct timespec *t2)
Returns the time difference (in milliseconds) between *t1 and *t2.
Definition: time.h:221