Lely core libraries  2.2.5
loop.c
Go to the documentation of this file.
1 
24 #include "ev.h"
25 #if !LELY_NO_THREADS
26 #include <lely/libc/stdatomic.h>
27 #include <lely/libc/threads.h>
28 #endif
29 #include <lely/ev/exec.h>
30 #define LELY_EV_LOOP_INLINE extern inline
31 #include <lely/ev/loop.h>
32 #include <lely/ev/std_exec.h>
33 #include <lely/ev/task.h>
34 #include <lely/util/dllist.h>
35 #include <lely/util/errnum.h>
36 #include <lely/util/time.h>
37 #include <lely/util/util.h>
38 
39 #include <assert.h>
40 #include <stdint.h>
41 #include <stdlib.h>
42 
43 #ifndef LELY_EV_LOOP_CTX_MAX_UNUSED
44 #define LELY_EV_LOOP_CTX_MAX_UNUSED 16
46 #endif
47 
49 struct ev_loop_ctx {
54  size_t refcnt;
60  struct ev_task task;
62  int *pstopped;
63 #if !LELY_NO_THREADS
64 
71  unsigned waiting : 1;
72 #endif
73  unsigned ready : 1;
76  unsigned polling : 1;
78  void *thr;
80  struct dlnode node;
85  struct ev_loop_ctx *next;
86 };
87 
88 static void ev_loop_ctx_task_func(struct ev_task *task);
89 
90 static struct ev_loop_ctx *ev_loop_ctx_alloc(void);
91 static void ev_loop_ctx_free(struct ev_loop_ctx *ctx);
92 
93 static void ev_loop_ctx_release(struct ev_loop_ctx *ctx);
94 
95 static struct ev_loop_ctx *ev_loop_ctx_create(
97 static void ev_loop_ctx_destroy(struct ev_loop_ctx *ctx);
98 
99 static size_t ev_loop_ctx_wait_one(struct ev_loop_ctx **pctx, ev_loop_t *loop,
101 static size_t ev_loop_ctx_wait_one_until(struct ev_loop_ctx **pctx,
103  const struct timespec *abs_time);
104 
105 static int ev_loop_ctx_kill(struct ev_loop_ctx *ctx, int stop);
106 
108 struct ev_loop_thrd {
110  int stopped;
112  struct ev_loop_ctx *ctx;
113 };
114 
115 #if LELY_NO_THREADS
116 static struct ev_loop_thrd ev_loop_thrd = { 0, NULL };
117 #else
118 static _Thread_local struct ev_loop_thrd ev_loop_thrd = { 0, NULL };
119 #endif
120 
121 static void ev_loop_std_exec_impl_on_task_init(ev_std_exec_impl_t *impl);
122 static void ev_loop_std_exec_impl_on_task_fini(ev_std_exec_impl_t *impl);
123 static void ev_loop_std_exec_impl_post(
124  ev_std_exec_impl_t *impl, struct ev_task *task);
125 static size_t ev_loop_std_exec_impl_abort(
126  ev_std_exec_impl_t *impl, struct ev_task *task);
127 
128 // clang-format off
129 static const struct ev_std_exec_impl_vtbl ev_loop_std_exec_impl_vtbl = {
130  &ev_loop_std_exec_impl_on_task_init,
131  &ev_loop_std_exec_impl_on_task_fini,
132  &ev_loop_std_exec_impl_post,
133  &ev_loop_std_exec_impl_abort
134 };
135 // clang-format on
136 
138 struct ev_loop {
145  size_t npoll;
152  struct ev_std_exec exec;
153 #if !LELY_NO_THREADS
154  mtx_t mtx;
156 #endif
157  struct sllist queue;
160  struct ev_task task;
167 #if LELY_NO_THREADS || (LELY_NO_ATOMICS && (!_WIN32 || defined(__MINGW32__)))
168  size_t ntasks;
169 #elif _WIN64 && !defined(__MINGW32__)
170  volatile LONGLONG ntasks;
171 #elif _WIN32 && !defined(__MINGW32__)
172  volatile LONG ntasks;
173 #else
174  atomic_size_t ntasks;
175 #endif
176  int stopped;
178 #if !LELY_NO_THREADS
179  struct dllist waiting;
181 #endif
182  struct dllist polling;
185  size_t npolling;
192  size_t nunused;
193 };
194 
195 static inline ev_loop_t *ev_loop_from_impl(const ev_std_exec_impl_t *impl);
196 
197 static int ev_loop_empty(const ev_loop_t *loop);
198 static size_t ev_loop_ntasks(const ev_loop_t *loop);
199 
200 static void ev_loop_do_stop(ev_loop_t *loop);
201 
202 static int ev_loop_kill_any(ev_loop_t *loop, int polling);
203 
204 void *
205 ev_loop_alloc(void)
206 {
207  void *ptr = malloc(sizeof(ev_loop_t));
208  if (!ptr)
209  set_errc(errno2c(errno));
210  return ptr;
211 }
212 
213 void
214 ev_loop_free(void *ptr)
215 {
216  free(ptr);
217 }
218 
219 ev_loop_t *
220 ev_loop_init(ev_loop_t *loop, ev_poll_t *poll, size_t npoll, int poll_task)
221 {
222  assert(loop);
223 
224  loop->poll = poll;
225  loop->npoll = npoll;
226 
227  loop->impl_vptr = &ev_loop_std_exec_impl_vtbl;
228  ev_std_exec_init(ev_loop_get_exec(loop), &loop->impl_vptr);
229 
230 #if !LELY_NO_THREADS
231  if (mtx_init(&loop->mtx, mtx_plain) != thrd_success)
232  return NULL;
233 #endif
234 
235  sllist_init(&loop->queue);
236 
237  loop->task = (struct ev_task)EV_TASK_INIT(NULL, NULL);
238  if (loop->poll && poll_task)
239  sllist_push_back(&loop->queue, &loop->task._node);
240 
241 #if LELY_NO_THREADS || LELY_NO_ATOMICS || (_WIN32 && !defined(__MINGW32__))
242  loop->ntasks = 0;
243 #else
244  atomic_init(&loop->ntasks, 0);
245 #endif
246 
247  loop->stopped = 0;
248 
249 #if !LELY_NO_THREADS
250  dllist_init(&loop->waiting);
251 #endif
252  dllist_init(&loop->polling);
253  loop->npolling = 0;
254 
255  loop->unused = NULL;
256  loop->nunused = 0;
257 
258  return loop;
259 }
260 
261 void
262 ev_loop_fini(ev_loop_t *loop)
263 {
264  assert(loop);
265 
266  while (loop->unused) {
267  struct ev_loop_ctx *ctx = loop->unused;
268  loop->unused = ctx->next;
269  ev_loop_ctx_free(ctx);
270  }
271 
272  assert(!loop->npolling);
273  assert(dllist_empty(&loop->polling));
274 #if !LELY_NO_THREADS
275  assert(dllist_empty(&loop->waiting));
276 #endif
277 
278 #if !LELY_NO_THREADS
279  mtx_destroy(&loop->mtx);
280 #endif
281 
282  ev_std_exec_fini(ev_loop_get_exec(loop));
283 }
284 
285 ev_loop_t *
286 ev_loop_create(ev_poll_t *poll, size_t npoll, int poll_task)
287 {
288  int errc = 0;
289 
290  ev_loop_t *loop = ev_loop_alloc();
291  if (!loop) {
292  errc = get_errc();
293  goto error_alloc;
294  }
295 
296  ev_loop_t *tmp = ev_loop_init(loop, poll, npoll, poll_task);
297  if (!tmp) {
298  errc = get_errc();
299  goto error_init;
300  }
301  loop = tmp;
302 
303  return loop;
304 
305 error_init:
306  ev_loop_free(loop);
307 error_alloc:
308  set_errc(errc);
309  return NULL;
310 }
311 
312 void
314 {
315  if (loop) {
316  ev_loop_fini(loop);
317  ev_loop_free(loop);
318  }
319 }
320 
321 ev_poll_t *
323 {
324  assert(loop);
325 
326  return loop->poll;
327 }
328 
329 ev_exec_t *
331 {
332  assert(loop);
333 
334  return &loop->exec.exec_vptr;
335 }
336 
337 void
339 {
340  assert(loop);
341 
342 #if !LELY_NO_THREADS
343  mtx_lock(&loop->mtx);
344 #endif
345  ev_loop_do_stop(loop);
346 #if !LELY_NO_THREADS
347  mtx_unlock(&loop->mtx);
348 #endif
349 }
350 
351 int
353 {
354  assert(loop);
355 
356 #if !LELY_NO_THREADS
357  mtx_lock((mtx_t *)&loop->mtx);
358 #endif
359  int stopped = loop->stopped;
360 #if !LELY_NO_THREADS
361  mtx_unlock((mtx_t *)&loop->mtx);
362 #endif
363  return stopped;
364 }
365 
366 void
368 {
369  assert(loop);
370 
371 #if !LELY_NO_THREADS
372  mtx_lock(&loop->mtx);
373 #endif
374  loop->stopped = 0;
375 #if !LELY_NO_THREADS
376  mtx_unlock(&loop->mtx);
377 #endif
378 }
379 
380 size_t
382 {
383  size_t n = 0;
384  struct ev_loop_ctx *ctx = NULL;
385  while (ev_loop_ctx_wait_one(&ctx, loop, future))
386  n += n < SIZE_MAX;
387  ev_loop_ctx_destroy(ctx);
388  return n;
389 }
390 
391 size_t
393  const struct timespec *abs_time)
394 {
395  size_t n = 0;
396  struct ev_loop_ctx *ctx = NULL;
397  while (ev_loop_ctx_wait_one_until(&ctx, loop, future, abs_time))
398  n += n < SIZE_MAX;
399  ev_loop_ctx_destroy(ctx);
400  return n;
401 }
402 
403 size_t
405 {
406  struct ev_loop_ctx *ctx = NULL;
407  size_t n = ev_loop_ctx_wait_one(&ctx, loop, future);
408  ev_loop_ctx_destroy(ctx);
409  return n;
410 }
411 
412 size_t
414  const struct timespec *abs_time)
415 {
416  struct ev_loop_ctx *ctx = NULL;
417  size_t n = ev_loop_ctx_wait_one_until(&ctx, loop, future, abs_time);
418  ev_loop_ctx_destroy(ctx);
419  return n;
420 }
421 
422 void *
424 {
425  return &ev_loop_thrd;
426 }
427 
428 int
429 ev_loop_kill(ev_loop_t *loop, void *thr_)
430 {
431 #if LELY_NO_THREADS
432  (void)loop;
433 #else
434  assert(loop);
435 #endif
436  struct ev_loop_thrd *thr = thr_;
437  assert(thr);
438 
439  int result = 0;
440  int errc = get_errc();
441 #if !LELY_NO_THREADS
442  mtx_lock(&loop->mtx);
443 #endif
444  if (!thr->stopped) {
445  if (thr->ctx) {
446  if ((result = ev_loop_ctx_kill(thr->ctx, 1)) == -1)
447  errc = get_errc();
448  } else {
449  thr->stopped = 1;
450  }
451  }
452 #if !LELY_NO_THREADS
453  mtx_unlock(&loop->mtx);
454 #endif
455  set_errc(errc);
456  return result;
457 }
458 
459 static void
460 ev_loop_ctx_task_func(struct ev_task *task)
461 {
462  assert(task);
463  struct ev_loop_ctx *ctx = structof(task, struct ev_loop_ctx, task);
464  ev_loop_t *loop = ctx->loop;
465  assert(loop);
466  assert(ctx->future);
467 
468 #if LELY_NO_THREADS
469  (void)loop;
470 #else
471  mtx_lock(&loop->mtx);
472 #endif
473  if (ctx->refcnt > 1 && !*ctx->pstopped && !ctx->ready) {
474  ctx->ready = 1;
475  ev_loop_ctx_kill(ctx, 0);
476  }
477 #if !LELY_NO_THREADS
478  mtx_unlock(&loop->mtx);
479 #endif
480  ev_loop_ctx_release(ctx);
481 }
482 
483 static struct ev_loop_ctx *
484 ev_loop_ctx_alloc(void)
485 {
486  int errc = 0;
487 
488  struct ev_loop_ctx *ctx = malloc(sizeof(*ctx));
489  if (!ctx) {
490  errc = errno2c(errno);
491  goto error_malloc_ctx;
492  }
493 
494  ctx->refcnt = 0;
495 
496  ctx->loop = NULL;
497 
498  ctx->future = NULL;
499  ctx->task = (struct ev_task)EV_TASK_INIT(NULL, &ev_loop_ctx_task_func);
500 
501  ctx->pstopped = NULL;
502 #if !LELY_NO_THREADS
503  if (cnd_init(&ctx->cond) != thrd_success) {
504  errc = get_errc();
505  goto error_init_cond;
506  }
507  ctx->waiting = 0;
508 #endif
509  ctx->ready = 0;
510  ctx->polling = 0;
511  ctx->thr = NULL;
512 
513  dlnode_init(&ctx->node);
514  ctx->next = NULL;
515 
516  return ctx;
517 
518 #if !LELY_NO_THREADS
519  // cnd_destroy(&ctx->cond);
520 error_init_cond:
521 #endif
522  free(ctx);
523 error_malloc_ctx:
524  set_errc(errc);
525  return NULL;
526 }
527 
528 static void
529 ev_loop_ctx_free(struct ev_loop_ctx *ctx)
530 {
531  if (ctx) {
532 #if !LELY_NO_THREADS
533  cnd_destroy(&ctx->cond);
534 #endif
535  free(ctx);
536  }
537 }
538 
539 static void
540 ev_loop_ctx_release(struct ev_loop_ctx *ctx)
541 {
542  assert(ctx);
543  ev_loop_t *loop = ctx->loop;
544  assert(loop);
545 
546 #if !LELY_NO_THREADS
547  mtx_lock(&loop->mtx);
548 #endif
549  if (--ctx->refcnt) {
550 #if !LELY_NO_THREADS
551  mtx_unlock(&loop->mtx);
552 #endif
553  return;
554  }
555  ev_future_t *future = ctx->future;
556  ctx->future = NULL;
557 #if !LELY_NO_THREADS
558  assert(!ctx->waiting);
559 #endif
560  assert(!ctx->polling);
561  if (loop->nunused < LELY_EV_LOOP_CTX_MAX_UNUSED) {
562  ctx->next = loop->unused;
563  loop->unused = ctx;
564  loop->nunused++;
565 #if !LELY_NO_THREADS
566  mtx_unlock(&loop->mtx);
567 #endif
568  ev_future_release(future);
569  } else {
570 #if !LELY_NO_THREADS
571  mtx_unlock(&loop->mtx);
572 #endif
573  ev_future_release(future);
574  ev_loop_ctx_free(ctx);
575  }
576 }
577 
578 static struct ev_loop_ctx *
579 ev_loop_ctx_create(ev_loop_t *loop, ev_future_t *future)
580 {
581  assert(loop);
582 
583 #if !LELY_NO_THREADS
584  mtx_lock(&loop->mtx);
585 #endif
586  struct ev_loop_ctx *ctx = loop->unused;
587  if (ctx) {
588  loop->unused = ctx->next;
589  assert(loop->nunused);
590  loop->nunused--;
591 #if !LELY_NO_THREADS
592  mtx_unlock(&loop->mtx);
593 #endif
594  } else {
595 #if !LELY_NO_THREADS
596  mtx_unlock(&loop->mtx);
597 #endif
598  ctx = ev_loop_ctx_alloc();
599  if (!ctx)
600  return NULL;
601  }
602 
603  assert(!ctx->refcnt);
604  ctx->refcnt++;
605 
606  ctx->loop = loop;
607 
608  ctx->future = ev_future_acquire(future);
609  ctx->task = (struct ev_task)EV_TASK_INIT(
610  ev_loop_get_exec(loop), &ev_loop_ctx_task_func);
611 
612  ctx->pstopped = &ev_loop_thrd.stopped;
613 
614 #if !LELY_NO_THREADS
615  assert(!ctx->waiting);
616 #endif
617  ctx->ready = 0;
618  assert(!ctx->polling);
619  ctx->thr = loop->poll ? ev_poll_self(loop->poll) : NULL;
620 
621  dlnode_init(&ctx->node);
622 
623  ctx->next = ev_loop_thrd.ctx;
624  ev_loop_thrd.ctx = ctx;
625 
626  if (ctx->future) {
627  ctx->refcnt++;
628  ev_future_submit(ctx->future, &ctx->task);
629  }
630 
631  return ctx;
632 }
633 
634 static void
635 ev_loop_ctx_destroy(struct ev_loop_ctx *ctx)
636 {
637  if (ctx) {
638  if (ctx->future)
639  ev_future_cancel(ctx->future, &ctx->task);
640 
641  assert(ev_loop_thrd.ctx == ctx);
642  ev_loop_thrd.ctx = ctx->next;
643 
644  ev_loop_ctx_release(ctx);
645  }
646 }
647 
648 static size_t
649 ev_loop_ctx_wait_one(
650  struct ev_loop_ctx **pctx, ev_loop_t *loop, ev_future_t *future)
651 {
652  assert(pctx);
653  struct ev_loop_ctx *ctx = *pctx;
654  assert(loop);
655  assert(!ctx || ctx->loop == loop);
656 
657  size_t n = 0;
658 #if !LELY_NO_THREADS
659  mtx_lock(&loop->mtx);
660 #endif
661  int poll_task = 0;
662  while (!loop->stopped && (!ctx || (!*ctx->pstopped && !ctx->ready))) {
663  // Stop the event loop if no more tasks remain or have been
664  // announced with ev_exec_on_task_init(), unless we should be
665  // waiting on a future but have not created the event loop
666  // context yet.
667  if (ev_loop_empty(loop) && !ev_loop_ntasks(loop)
668  && (!future || ctx)) {
669  ev_loop_do_stop(loop);
670  continue;
671  }
672  struct ev_task *task = ev_task_from_node(
673  sllist_pop_front(&loop->queue));
674  if (task && task == &loop->task) {
675  // The polling task is not a real task, but is part of
676  // the task queue for scheduling purposes.
677  poll_task = 1;
678  task = NULL;
679  }
680  if (task) {
681  // If a real task is available, execute it and return.
682 #if !LELY_NO_THREADS
683  mtx_unlock(&loop->mtx);
684 #endif
685  assert(task->exec);
686  ev_exec_run(task->exec, task);
687  n++;
688 #if !LELY_NO_THREADS
689  mtx_lock(&loop->mtx);
690 #endif
691  break;
692  }
693  if (!ctx) {
694  // Only create an event loop context when we have to
695  // poll or wait.
696 #if !LELY_NO_THREADS
697  mtx_unlock(&loop->mtx);
698 #endif
699  ctx = *pctx = ev_loop_ctx_create(loop, future);
700  if (!ctx) {
701 #if !LELY_NO_THREADS
702  mtx_lock(&loop->mtx);
703 #endif
704  break;
705  }
706 #if !LELY_NO_THREADS
707  mtx_lock(&loop->mtx);
708 #endif
709  // We released the lock, so a task may have been queued.
710  continue;
711  }
712  if (loop->poll && (!loop->npoll || loop->npolling < loop->npoll)) {
713  ctx->polling = 1;
714  // Wake polling threads in LIFO order.
715  dllist_push_front(&loop->polling, &ctx->node);
716  loop->npolling++;
717  int empty = sllist_empty(&loop->queue);
718 #if !LELY_NO_THREADS
719  mtx_unlock(&loop->mtx);
720 #endif
721  int result = ev_poll_wait(loop->poll, empty ? -1 : 0);
722 #if !LELY_NO_THREADS
723  mtx_lock(&loop->mtx);
724 #endif
725  loop->npolling--;
726  dllist_remove(&loop->polling, &ctx->node);
727  ctx->polling = 0;
728  if (result == -1)
729  break;
730  } else if (!sllist_empty(&loop->queue)) {
731  continue;
732  } else {
733 #if LELY_NO_THREADS
734  break;
735 #else // !LELY_NO_THREADS
736  ctx->waiting = 1;
737  // Wake waiting threads in LIFO order.
738  dllist_push_front(&loop->waiting, &ctx->node);
739  int result = cnd_wait(&ctx->cond, &loop->mtx);
740  dllist_remove(&loop->waiting, &ctx->node);
741  ctx->waiting = 0;
742  if (result != thrd_success)
743  break;
744 #endif // !LELY_NO_THREADS
745  }
746  }
747  int empty = sllist_empty(&loop->queue);
748  if (poll_task)
749  // Requeue the polling task.
750  sllist_push_back(&loop->queue, &loop->task._node);
751  if (!empty)
752  // If any real tasks remain on the queue, wake up any polling or
753  // non-polling thread.
754  ev_loop_kill_any(loop, 1);
755  else if (poll_task)
756  // Wake up any non-polling thread so it can start polling.
757  ev_loop_kill_any(loop, 0);
758  if (!n && ctx && *ctx->pstopped)
759  // Reset the thread-local flag used to stop an event loop with
760  // ev_loop_kill(), so it will resume on the next run funciton.
761  *ctx->pstopped = 0;
762 #if !LELY_NO_THREADS
763  mtx_unlock(&loop->mtx);
764 #endif
765  return n;
766 }
767 
768 #include <stdio.h>
769 
770 static size_t
771 ev_loop_ctx_wait_one_until(struct ev_loop_ctx **pctx, ev_loop_t *loop,
772  ev_future_t *future, const struct timespec *abs_time)
773 {
774  assert(pctx);
775  struct ev_loop_ctx *ctx = *pctx;
776  assert(loop);
777  assert(!ctx || ctx->loop == loop);
778 #if LELY_NO_THREADS && LELY_NO_TIMEOUT
779  (void)abs_time;
780 #endif
781 
782  size_t n = 0;
783 #if !LELY_NO_THREADS
784  mtx_lock(&loop->mtx);
785 #endif
786  int poll_task = 0;
787  while (!loop->stopped && (!ctx || (!*ctx->pstopped && !ctx->ready))) {
788  // Stop the event loop if no more tasks remain or have been
789  // announced with ev_exec_on_task_init(), unless we should be
790  // waiting on a future but have not created the event loop
791  // context yet.
792  if (ev_loop_empty(loop) && !ev_loop_ntasks(loop)
793  && (!future || ctx)) {
794  ev_loop_do_stop(loop);
795  continue;
796  }
797  struct ev_task *task = ev_task_from_node(
798  sllist_pop_front(&loop->queue));
799  if (task && task == &loop->task) {
800  // The polling task is not a real task, but is part of
801  // the task queue for scheduling purposes.
802  poll_task = 1;
803  task = NULL;
804  }
805  if (task) {
806  // If a real task is available, execute it and return.
807 #if !LELY_NO_THREADS
808  mtx_unlock(&loop->mtx);
809 #endif
810  assert(task->exec);
811  ev_exec_run(task->exec, task);
812  n++;
813 #if !LELY_NO_THREADS
814  mtx_lock(&loop->mtx);
815 #endif
816  break;
817  }
818  if (!ctx) {
819  // Only create an event loop context when we have to
820  // poll or wait.
821 #if !LELY_NO_THREADS
822  mtx_unlock(&loop->mtx);
823 #endif
824  ctx = *pctx = ev_loop_ctx_create(loop, future);
825  if (!ctx) {
826 #if !LELY_NO_THREADS
827  mtx_lock(&loop->mtx);
828 #endif
829  break;
830  }
831 #if !LELY_NO_THREADS
832  mtx_lock(&loop->mtx);
833 #endif
834  // We released the lock, so a task may have been queued.
835  continue;
836  }
837  if (loop->poll && (!loop->npoll || loop->npolling < loop->npoll)) {
838  ctx->polling = 1;
839  // Wake polling threads in LIFO order.
840  dllist_push_front(&loop->polling, &ctx->node);
841  loop->npolling++;
842 #if !LELY_NO_TIMEOUT
843  int empty = sllist_empty(&loop->queue);
844 #endif
845 #if !LELY_NO_THREADS
846  mtx_unlock(&loop->mtx);
847 #endif
848  int result = -1;
849  int64_t msec = 0;
850 #if !LELY_NO_TIMEOUT
851  if (empty && abs_time) {
852  struct timespec now = { 0, 0 };
853  if (!timespec_get(&now, TIME_UTC))
854  goto error;
855  if (timespec_cmp(abs_time, &now) <= 0) {
857  goto error;
858  }
859  msec = timespec_diff_msec(abs_time, &now);
860  if (!msec)
861  // Prevent a busy loop due to rounding.
862  msec = 1;
863  else if (msec > INT_MAX)
864  msec = INT_MAX;
865  }
866 #endif // !LELY_NO_TIMEOUT
867  result = ev_poll_wait(loop->poll, msec);
868 #if !LELY_NO_TIMEOUT
869  error:
870 #endif
871 #if !LELY_NO_THREADS
872  mtx_lock(&loop->mtx);
873 #endif
874  loop->npolling--;
875  dllist_remove(&loop->polling, &ctx->node);
876  ctx->polling = 0;
877  if (result == -1)
878  break;
879  } else if (!sllist_empty(&loop->queue)) {
880  continue;
881 #if !LELY_NO_THREADS && !LELY_NO_TIMEOUT
882  } else if (abs_time) {
883  ctx->waiting = 1;
884  // Wake waiting threads in LIFO order.
885  dllist_push_front(&loop->waiting, &ctx->node);
886  int result = cnd_timedwait(
887  &ctx->cond, &loop->mtx, abs_time);
888  dllist_remove(&loop->waiting, &ctx->node);
889  ctx->waiting = 0;
890  if (result != thrd_success) {
891  if (result == thrd_timedout)
893  break;
894  }
895 #endif // !LELY_NO_THREADS
896  } else {
897  break;
898  }
899  }
900  int empty = sllist_empty(&loop->queue);
901  if (poll_task)
902  // Requeue the polling task.
903  sllist_push_back(&loop->queue, &loop->task._node);
904  if (!empty)
905  // If any real tasks remain on the queue, wake up any polling or
906  // non-polling thread.
907  ev_loop_kill_any(loop, 1);
908  else if (poll_task)
909  // Wake up any non-polling thread so it can start polling.
910  ev_loop_kill_any(loop, 0);
911  if (!n && ctx && *ctx->pstopped)
912  // Reset the thread-local flag used to stop an event loop with
913  // ev_loop_kill(), so it will resume on the next run funciton.
914  *ctx->pstopped = 0;
915 #if !LELY_NO_THREADS
916  mtx_unlock(&loop->mtx);
917 #endif
918  return n;
919 }
920 
921 static int
922 ev_loop_ctx_kill(struct ev_loop_ctx *ctx, int stop)
923 {
924  assert(ctx);
925  assert(ctx->loop);
926  assert(ctx->pstopped);
927  assert(!ctx->polling || ctx->loop->poll);
928 
929  if (*ctx->pstopped)
930  return 0;
931  else if (stop)
932  *ctx->pstopped = 1;
933 
934 #if !LELY_NO_THREADS
935  if (ctx->waiting) {
936  cnd_signal(&ctx->cond);
937  return 0;
938  }
939 #endif
940  return ctx->polling ? ev_poll_kill(ctx->loop->poll, ctx->thr) : 0;
941 }
942 
943 static void
944 ev_loop_std_exec_impl_on_task_init(ev_std_exec_impl_t *impl)
945 {
946  ev_loop_t *loop = ev_loop_from_impl(impl);
947 
948 #if LELY_NO_THREADS || (LELY_NO_ATOMICS && (!_WIN32 || defined(__MINGW32__)))
949  loop->ntasks++;
950 #elif _WIN64 && !defined(__MINGW32__)
951  InterlockedIncrementNoFence64(&loop->ntasks);
952 #elif _WIN32 && !defined(__MINGW32__)
953  InterlockedIncrementNoFence(&loop->ntasks);
954 #else
955  atomic_fetch_add_explicit(&loop->ntasks, 1, memory_order_relaxed);
956 #endif
957 }
958 
959 static void
960 ev_loop_std_exec_impl_on_task_fini(ev_std_exec_impl_t *impl)
961 {
962  ev_loop_t *loop = ev_loop_from_impl(impl);
963 
964 #if LELY_NO_THREADS || (LELY_NO_ATOMICS && (!_WIN32 || defined(__MINGW32__)))
965  if (!--loop->ntasks) {
966 #elif _WIN64 && !defined(__MINGW32__)
967  if (!InterlockedDecrementRelease64(&loop->ntasks)) {
968  MemoryBarrier();
969 #elif _WIN32 && !defined(__MINGW32__)
970  if (!InterlockedDecrementRelease(&loop->ntasks)) {
971  MemoryBarrier();
972 #else
973  if (atomic_fetch_sub_explicit(&loop->ntasks, 1, memory_order_release)
974  == 1) {
975  atomic_thread_fence(memory_order_acquire);
976 #endif
977 #if !LELY_NO_THREADS
978  mtx_lock(&loop->mtx);
979 #endif
980  if (ev_loop_empty(loop))
981  ev_loop_do_stop(loop);
982 #if !LELY_NO_THREADS
983  mtx_unlock(&loop->mtx);
984 #endif
985  }
986 }
987 
988 static void
989 ev_loop_std_exec_impl_post(ev_std_exec_impl_t *impl, struct ev_task *task)
990 {
991  ev_loop_t *loop = ev_loop_from_impl(impl);
992  assert(task);
993 
994 #if !LELY_NO_THREADS
995  mtx_lock(&loop->mtx);
996 #endif
997  int empty = sllist_empty(&loop->queue);
998  sllist_push_back(&loop->queue, &task->_node);
999  if (empty)
1000  ev_loop_kill_any(loop, 1);
1001 #if !LELY_NO_THREADS
1002  mtx_unlock(&loop->mtx);
1003 #endif
1004 }
1005 
1006 static size_t
1007 ev_loop_std_exec_impl_abort(ev_std_exec_impl_t *impl, struct ev_task *task)
1008 {
1009  ev_loop_t *loop = ev_loop_from_impl(impl);
1010 
1011  struct sllist queue;
1012  sllist_init(&queue);
1013 
1014 #if !LELY_NO_THREADS
1015  mtx_lock(&loop->mtx);
1016 #endif
1017  if (!task) {
1018  int poll_task = 0;
1019  struct slnode *node;
1020  while ((node = sllist_pop_front(&loop->queue))) {
1021  if (ev_task_from_node(node) == &loop->task)
1022  poll_task = 1;
1023  else
1024  sllist_push_back(&queue, node);
1025  }
1026  if (poll_task)
1027  sllist_push_back(&loop->queue, &loop->task._node);
1028  } else if (sllist_remove(&loop->queue, &task->_node)) {
1029  sllist_push_back(&queue, &task->_node);
1030  }
1031 #if !LELY_NO_THREADS
1032  mtx_unlock(&loop->mtx);
1033 #endif
1034 
1035  size_t n = 0;
1036  while (sllist_pop_front(&queue))
1037  n += n < SIZE_MAX;
1038  return n;
1039 }
1040 
1041 static inline ev_loop_t *
1042 ev_loop_from_impl(const ev_std_exec_impl_t *impl)
1043 {
1044  assert(impl);
1045 
1046  return structof(impl, ev_loop_t, impl_vptr);
1047 }
1048 
1049 static int
1050 ev_loop_empty(const ev_loop_t *loop)
1051 {
1052  assert(loop);
1053 
1054  if (sllist_empty(&loop->queue))
1055  return 1;
1056  struct slnode *node = sllist_first(&loop->queue);
1057  // The task queue is considered empty if only the polling task remains.
1058  return node == &loop->task._node && node->next == NULL;
1059 }
1060 
1061 static size_t
1062 ev_loop_ntasks(const ev_loop_t *loop)
1063 {
1064  assert(loop);
1065 
1066 #if LELY_NO_THREADS || LELY_NO_ATOMICS || (_WIN32 && !defined(__MINGW32__))
1067  return loop->ntasks;
1068 #else
1069  return atomic_load_explicit(
1070  (atomic_size_t *)&loop->ntasks, memory_order_relaxed);
1071 #endif
1072 }
1073 
1074 static void
1075 ev_loop_do_stop(ev_loop_t *loop)
1076 {
1077  assert(loop);
1078 
1079  if (!loop->stopped) {
1080  loop->stopped = 1;
1081 #if !LELY_NO_THREADS
1082  dllist_foreach (&loop->waiting, node) {
1083  struct ev_loop_ctx *ctx = structof(
1084  node, struct ev_loop_ctx, node);
1085  ev_loop_ctx_kill(ctx, 1);
1086  }
1087 #endif
1088  dllist_foreach (&loop->polling, node) {
1089  struct ev_loop_ctx *ctx = structof(
1090  node, struct ev_loop_ctx, node);
1091  ev_loop_ctx_kill(ctx, 1);
1092  }
1093  }
1094 }
1095 
1096 static int
1097 ev_loop_kill_any(ev_loop_t *loop, int polling)
1098 {
1099  assert(loop);
1100 
1101  struct dlnode *node;
1102 #if !LELY_NO_THREADS
1103  if ((node = dllist_first(&loop->waiting))) {
1104  struct ev_loop_ctx *ctx =
1105  structof(node, struct ev_loop_ctx, node);
1106  return ev_loop_ctx_kill(ctx, 0);
1107  }
1108 #endif
1109  if (polling && (node = dllist_first(&loop->polling))) {
1110  struct ev_loop_ctx *ctx =
1111  structof(node, struct ev_loop_ctx, node);
1112  return ev_loop_ctx_kill(ctx, 0);
1113  }
1114  return 0;
1115 }
void * ev_loop_self(void)
Returns the identifier of the calling thread.
Definition: loop.c:423
int ev_loop_stopped(const ev_loop_t *loop)
Returns 1 if the event loop is stopped, and 0 if not.
Definition: loop.c:352
struct ev_loop_ctx * ctx
A pointer to the event loop context for this thread.
Definition: loop.c:112
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
void * thr
The thread identifier of the polling instance.
Definition: loop.c:78
This header file is part of the event library; it contains the standard executor declarations.
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
A polling event loop.
Definition: loop.c:138
ev_loop_t * loop
A pointer to the event loop managing this context.
Definition: loop.c:56
void ev_loop_restart(ev_loop_t *loop)
Restarts an event loop.
Definition: loop.c:367
#define dllist_foreach(list, node)
Iterates in order over each node in a doubly-linked list.
Definition: dllist.h:224
size_t refcnt
The number of references to this context.
Definition: loop.c:54
size_t ev_loop_wait_one(ev_loop_t *loop, ev_future_t *future)
If the event loop has pending tasks, runs a single task.
Definition: loop.c:404
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
int * pstopped
The address of the stopped flag of the thread.
Definition: loop.c:62
int cnd_signal(cnd_t *cond)
Unblocks one of the threads that are blocked on the condition variable at cond at the time of the cal...
int ev_loop_kill(ev_loop_t *loop, void *thr_)
Interrupts an event loop running on the specified thread.
Definition: loop.c:429
Indicates that the time specified in the call was reached without acquiring the requested resource...
Definition: threads.h:128
struct ev_loop_ctx * next
A pointer to the next context in the list of running or unused contexts.
Definition: loop.c:85
void set_errnum(errnum_t errnum)
Sets the current (thread-specific) platform-independent error number to errnum.
Definition: errnum.h:375
Indicates that the requested operation succeeded.
Definition: threads.h:121
size_t npoll
The number of threads allowed to poll simultaneously.
Definition: loop.c:145
ev_poll_t * ev_loop_get_poll(const ev_loop_t *loop)
Returns a pointer to the polling instance used by the event loop, or NULL if the loop does not poll...
Definition: loop.c:322
struct ev_task task
The task to be executed once the future is ready.
Definition: loop.c:60
int timespec_get(struct timespec *ts, int base)
Sets the interval at ts to hold the current calendar time based on the specified time base...
Definition: time.c:32
unsigned waiting
A flag indicating if a thread is waiting on cond.
Definition: loop.c:71
unsigned polling
A flag indicating if a thread is polling.
Definition: loop.c:76
A node in a singly-linked list.
Definition: sllist.h:39
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:221
struct slnode * sllist_first(const struct sllist *list)
Returns a pointer to the first node in a singly-linked list.
Definition: sllist.h:244
A future.
Definition: future.c:63
size_t ev_future_cancel(ev_future_t *future, struct ev_task *task)
Cancels the specified task submitted with ev_future_submit(), if it has not yet been scheduled for ex...
Definition: future.c:386
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:957
ev_poll_t * poll
A pointer to the interface used to poll for events (can be NULL).
Definition: loop.c:140
A doubly-linked list.
Definition: dllist.h:53
This header file is part of the event library; it contains the polling event loop declarations...
int ev_poll_wait(ev_poll_t *poll, int timeout)
Waits for at most timeout milliseconds while polling for new events.
Definition: poll.h:75
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:213
const struct ev_std_exec_impl_vtbl * impl_vptr
A pointer to the virtual table containing the interface used by the standard executor (exec)...
Definition: loop.c:150
void ev_future_release(ev_future_t *future)
Releases a reference to a future.
Definition: future.c:325
This header file is part of the C11 and POSIX compatibility library; it includes <threads.h>, if it exists, and defines any missing functionality.
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
struct dllist polling
The list of polling contexts.
Definition: loop.c:183
pthread_cond_t cnd_t
A complete object type that holds an identifier for a condition variable.
Definition: threads.h:78
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
int dllist_empty(const struct dllist *list)
Returns 1 if the doubly-linked list is empty, and 0 if not.
Definition: dllist.h:270
This header file is part of the utilities library; it contains the native and platform-independent er...
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
This header file is part of the utilities library; it contains the time function declarations.
ev_future_t * ev_future_acquire(ev_future_t *future)
Acquires a reference to a future.
Definition: future.c:317
This header file is part of the C11 and POSIX compatibility library; it includes <stdatomic.h>, if it exists, and defines any missing functionality.
int cnd_init(cnd_t *cond)
Creates a condition variable.
struct dlnode node
The node of this context in the list of waiting or polling contexts.
Definition: loop.c:80
ev_loop_t * ev_loop_create(ev_poll_t *poll, size_t npoll, int poll_task)
Creates a new polling event loop.
Definition: loop.c:286
A singly-linked list.
Definition: sllist.h:51
int stopped
A flag specifying whether the event loop is stopped.
Definition: loop.c:177
void ev_loop_destroy(ev_loop_t *loop)
Destroys a polling event loop.
Definition: loop.c:313
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function...
Definition: errnum.c:947
struct ev_task task
The task used to trigger polling.
Definition: loop.c:160
int cnd_timedwait(cnd_t *cond, mtx_t *mtx, const struct timespec *ts)
Atomically unlocks the mutex at mtx and endeavors to block until the condition variable at cond is si...
const struct ev_poll_vtbl *const ev_poll_t
The abstract polling interface.
Definition: poll.h:32
size_t ev_loop_wait_one_until(ev_loop_t *loop, ev_future_t *future, const struct timespec *abs_time)
If the event loop has pending tasks, runs a single task.
Definition: loop.c:413
struct slnode * next
A pointer to the next node in the list.
Definition: sllist.h:41
This header file is part of the utilities library; it contains the doubly-linked list declarations...
size_t npolling
The number of polling contexts.
Definition: loop.c:185
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:43
struct dllist waiting
The list of waiting contexts.
Definition: loop.c:180
int cnd_wait(cnd_t *cond, mtx_t *mtx)
Atomically unlocks the mutex at mtx and endeavors to block until the condition variable at cond is si...
int timespec_cmp(const void *p1, const void *p2)
Compares two times.
Definition: time.h:228
A node in a doubly-linked list.
Definition: dllist.h:39
void dlnode_init(struct dlnode *node)
Initializes a node in a doubly-linked list.
Definition: dllist.h:227
This header file is part of the C11 and POSIX compatibility library; it includes <stdint.h> and defines any missing functionality.
This header file is part of the event library; it contains the task declarations. ...
int_least64_t timespec_diff_msec(const struct timespec *t1, const struct timespec *t2)
Returns the time difference (in milliseconds) between *t1 and *t2.
Definition: time.h:207
This header file is part of the C11 and POSIX compatibility library; it includes <stdio.h> and defines any missing functionality.
An executable task.
Definition: task.h:41
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:190
int ev_poll_kill(ev_poll_t *poll, void *thr)
Interrupts a polling wait on the specified thread.
Definition: poll.h:81
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
This header file is part of the event library; it contains the abstract task executor interface...
An event loop context.
Definition: loop.c:49
unsigned ready
A flag indicating if future is ready.
Definition: loop.c:74
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition: features.h:239
void dllist_push_front(struct dllist *list, struct dlnode *node)
Pushes a node to the front of a doubly-linked list.
Definition: dllist.h:285
size_t nunused
The number of unused contexts.
Definition: loop.c:192
void cnd_destroy(cnd_t *cond)
Releases all resources used by the condition variable at cond.
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node...
Definition: task.c:32
struct sllist queue
The queue of pending tasks.
Definition: loop.c:158
#define LELY_EV_LOOP_CTX_MAX_UNUSED
The maximum number of unused contexts per event loop.
Definition: loop.c:45
struct ev_loop_ctx * unused
The list of unused contexts.
Definition: loop.c:187
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib.h> and defines any missing functionality.
atomic_size_t ntasks
The number of pending tasks.
Definition: loop.c:174
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:184
This is the internal header file of the event library.
size_t ev_loop_wait(ev_loop_t *loop, ev_future_t *future)
Equivalent to size_t n = 0; while (ev_loop_wait_one(loop, future)) n += n < SIZE_MAX; return ...
Definition: loop.c:381
size_t ev_loop_wait_until(ev_loop_t *loop, ev_future_t *future, const struct timespec *abs_time)
Equivalent to size_t n = 0; while (ev_loop_wait_one_until(loop, future, abs_time)) n += n < S...
Definition: loop.c:392
void dllist_init(struct dllist *list)
Initializes a doubly-linked list.
Definition: dllist.h:263
struct ev_std_exec exec
The executor corresponding to the event loop.
Definition: loop.c:152
void ev_exec_run(ev_exec_t *exec, struct ev_task *task)
Invokes the task function in *task as if the task is being executed by *exec.
Definition: exec.h:144
mtx_t mtx
The mutex protecting the task queue.
Definition: loop.c:155
struct dlnode * dllist_first(const struct dllist *list)
Returns a pointer to the first node in a doubly-linked list.
Definition: dllist.h:376
cnd_t cond
The condition variable used by threads to wait for a task to be submitted to the event loop or for th...
Definition: loop.c:69
int stopped
A flag used to interrupt the event loop on this thread.
Definition: loop.c:110
This is the public header file of the utilities library.
void ev_loop_stop(ev_loop_t *loop)
Stops the event loop.
Definition: loop.c:338
void * ev_poll_self(const ev_poll_t *poll)
Returns the identifier of the calling thread.
Definition: poll.h:69
ev_exec_t * ev_loop_get_exec(const ev_loop_t *loop)
Returns a pointer to the executor corresponding to the event loop.
Definition: loop.c:330
void ev_future_submit(ev_future_t *future, struct ev_task *task)
Submits a task to be executed once the specified future is ready.
Definition: future.c:360
Connection timed out.
Definition: errnum.h:226
An event loop thread.
Definition: loop.c:108
void dllist_remove(struct dllist *list, struct dlnode *node)
Removes a node from a doubly-linked list.
Definition: dllist.h:349
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values: ...
ev_future_t * future
The future on which the loop is waiting.
Definition: loop.c:58