Lely core libraries  2.3.4
tqueue.c
Go to the documentation of this file.
1 
24 #include "io2.h"
25 
26 #if !LELY_NO_MALLOC
27 
28 #if !LELY_NO_THREADS
29 #include <lely/libc/threads.h>
30 #endif
31 #include <lely/ev/exec.h>
32 #include <lely/io2/ctx.h>
33 #include <lely/io2/tqueue.h>
34 #include <lely/util/errnum.h>
35 #include <lely/util/time.h>
36 #include <lely/util/util.h>
37 
38 #include <assert.h>
39 #include <stdlib.h>
40 
41 static io_ctx_t *io_tqueue_dev_get_ctx(const io_dev_t *dev);
42 static ev_exec_t *io_tqueue_dev_get_exec(const io_dev_t *dev);
43 static size_t io_tqueue_dev_cancel(io_dev_t *dev, struct ev_task *task);
44 static size_t io_tqueue_dev_abort(io_dev_t *dev, struct ev_task *task);
45 
46 // clang-format off
47 static const struct io_dev_vtbl io_tqueue_dev_vtbl = {
48  &io_tqueue_dev_get_ctx,
49  &io_tqueue_dev_get_exec,
50  &io_tqueue_dev_cancel,
51  &io_tqueue_dev_abort
52 };
53 // clang-format on
54 
55 static void io_tqueue_svc_shutdown(struct io_svc *svc);
56 
57 // clang-format off
58 static const struct io_svc_vtbl io_tqueue_svc_vtbl = {
59  NULL,
60  &io_tqueue_svc_shutdown
61 };
62 // clang-format on
63 
64 struct io_tqueue {
65  const struct io_dev_vtbl *dev_vptr;
66  struct io_svc svc;
67  io_ctx_t *ctx;
68  io_timer_t *timer;
69  ev_exec_t *exec;
70  struct io_timer_wait wait;
71 #if !LELY_NO_THREADS
72  mtx_t mtx;
73 #endif
74  unsigned shutdown : 1;
75  unsigned submitted : 1;
76  struct timespec next;
77  struct pheap queue;
78 };
79 
80 static void io_tqueue_wait_func(struct ev_task *task);
81 
82 static inline io_tqueue_t *io_tqueue_from_dev(const io_dev_t *dev);
83 static inline io_tqueue_t *io_tqueue_from_svc(const struct io_svc *svc);
84 
85 static void io_tqueue_pop(
86  io_tqueue_t *tq, struct sllist *queue, struct ev_task *task);
87 static void io_tqueue_do_pop_wait(io_tqueue_t *tq, struct sllist *queue,
88  struct io_tqueue_wait *wait);
89 
90 static inline void io_tqueue_wait_post(struct io_tqueue_wait *wait, int errc);
91 static inline size_t io_tqueue_wait_queue_post(struct sllist *queue, int errc);
92 
94  ev_promise_t *promise;
95  struct io_tqueue_wait wait;
96 };
97 
98 static void io_tqueue_async_wait_func(struct ev_task *task);
99 
100 void *
101 io_tqueue_alloc(void)
102 {
103  void *ptr = malloc(sizeof(io_tqueue_t));
104 #if !LELY_NO_ERRNO
105  if (!ptr)
106  set_errc(errno2c(errno));
107 #endif
108  return ptr;
109 }
110 
111 void
112 io_tqueue_free(void *ptr)
113 {
114  free(ptr);
115 }
116 
117 io_tqueue_t *
118 io_tqueue_init(io_tqueue_t *tq, io_timer_t *timer, ev_exec_t *exec)
119 {
120  assert(tq);
121  assert(timer);
122  io_dev_t *dev = io_timer_get_dev(timer);
123  assert(dev);
124  io_ctx_t *ctx = io_dev_get_ctx(dev);
125  assert(ctx);
126  if (!exec)
127  exec = io_dev_get_exec(dev);
128  assert(exec);
129 
130  tq->dev_vptr = &io_tqueue_dev_vtbl;
131 
132  tq->svc = (struct io_svc)IO_SVC_INIT(&io_tqueue_svc_vtbl);
133  tq->ctx = ctx;
134 
135  tq->timer = timer;
136 
137  tq->exec = exec;
138 
139  tq->wait = (struct io_timer_wait)IO_TIMER_WAIT_INIT(
140  tq->exec, &io_tqueue_wait_func);
141 
142 #if !LELY_NO_THREADS
143  if (mtx_init(&tq->mtx, mtx_plain) != thrd_success)
144  return NULL;
145 #endif
146 
147  tq->shutdown = 0;
148  tq->submitted = 0;
149 
150  tq->next = (struct timespec){ 0, 0 };
151 
152  pheap_init(&tq->queue, &timespec_cmp);
153 
154  io_ctx_insert(tq->ctx, &tq->svc);
155 
156  return tq;
157 }
158 
159 void
160 io_tqueue_fini(io_tqueue_t *tq)
161 {
162  assert(tq);
163 
164  io_ctx_remove(tq->ctx, &tq->svc);
165  // Cancel all pending operations.
166  io_tqueue_svc_shutdown(&tq->svc);
167 
168 #if !LELY_NO_THREADS
169  mtx_lock(&tq->mtx);
170  // If necessary, busy-wait until If io_tqueue_wait_func() completes.
171  while (tq->submitted
172  && !ev_exec_abort(tq->wait.task.exec, &tq->wait.task)) {
173  mtx_unlock(&tq->mtx);
174  thrd_yield();
175  mtx_lock(&tq->mtx);
176  }
177  mtx_unlock(&tq->mtx);
178 
179  mtx_destroy(&tq->mtx);
180 #endif
181 }
182 
183 io_tqueue_t *
185 {
186  int errc = 0;
187 
188  io_tqueue_t *tq = io_tqueue_alloc();
189  if (!tq) {
190  errc = get_errc();
191  goto error_alloc;
192  }
193 
194  io_tqueue_t *tmp = io_tqueue_init(tq, timer, exec);
195  if (!tmp) {
196  errc = get_errc();
197  goto error_init;
198  }
199  tq = tmp;
200 
201  return tq;
202 
203 error_init:
204  io_tqueue_free(tq);
205 error_alloc:
206  set_errc(errc);
207  return NULL;
208 }
209 
210 void
212 {
213  if (tq) {
214  io_tqueue_fini(tq);
215  io_tqueue_free(tq);
216  }
217 }
218 
219 io_dev_t *
221 {
222  assert(tq);
223 
224  return &tq->dev_vptr;
225 }
226 
227 io_timer_t *
229 {
230  assert(tq);
231 
232  return tq->timer;
233 }
234 
235 void
237 {
238  assert(tq);
239  assert(wait);
240  struct ev_task *task = &wait->task;
241 
242  if (!task->exec)
243  task->exec = tq->exec;
244  ev_exec_on_task_init(task->exec);
245 
246 #if !LELY_NO_THREADS
247  mtx_lock(&tq->mtx);
248 #endif
249  if (tq->shutdown) {
250 #if !LELY_NO_THREADS
251  mtx_unlock(&tq->mtx);
252 #endif
253  io_tqueue_wait_post(wait, errnum2c(ERRNUM_CANCELED));
254  } else {
255  int errsv = get_errc();
256  int errc = 0;
257  struct sllist queue;
258  sllist_init(&queue);
259 
260  task->_data = &tq->queue;
261  pnode_init(&wait->_node, &wait->value);
262  pheap_insert(&tq->queue, &wait->_node);
263 
264  if ((!tq->next.tv_sec && !tq->next.tv_nsec)
265  || timespec_cmp(&wait->value, &tq->next) < 0) {
266  tq->next = wait->value;
267  struct itimerspec value = { { 0, 0 }, tq->next };
268  // clang-format off
269  if (io_timer_settime(tq->timer, TIMER_ABSTIME, &value,
270  NULL) == -1) {
271  // clang-format on
272  tq->next = (struct timespec){ 0, 0 };
273  errc = get_errc();
274  io_tqueue_do_pop_wait(tq, &queue, NULL);
275  }
276  }
277 
278  int submit = !tq->submitted && !pheap_empty(&tq->queue);
279  if (submit)
280  tq->submitted = 1;
281 #if !LELY_NO_THREADS
282  mtx_unlock(&tq->mtx);
283 #endif
284  // cppcheck-suppress duplicateCondition
285  if (submit)
286  io_timer_submit_wait(tq->timer, &tq->wait);
287 
288  io_tqueue_wait_queue_post(&queue, errc);
289  set_errc(errsv);
290  }
291 }
292 
293 size_t
295 {
296  assert(tq);
297 
298  struct sllist queue;
299  sllist_init(&queue);
300 
301 #if !LELY_NO_THREADS
302  mtx_lock(&tq->mtx);
303 #endif
304  io_tqueue_do_pop_wait(tq, &queue, wait);
305 #if !LELY_NO_THREADS
306  mtx_unlock(&tq->mtx);
307 #endif
308 
309  return io_tqueue_wait_queue_post(&queue, errnum2c(ERRNUM_CANCELED));
310 }
311 
312 size_t
314 {
315  assert(tq);
316 
317  struct sllist queue;
318  sllist_init(&queue);
319 
320 #if !LELY_NO_THREADS
321  mtx_lock(&tq->mtx);
322 #endif
323  io_tqueue_do_pop_wait(tq, &queue, wait);
324 #if !LELY_NO_THREADS
325  mtx_unlock(&tq->mtx);
326 #endif
327 
328  return ev_task_queue_abort(&queue);
329 }
330 
331 ev_future_t *
333  const struct timespec *value, struct io_tqueue_wait **pwait)
334 {
335  assert(value);
336 
337  ev_promise_t *promise = ev_promise_create(
338  sizeof(struct io_tqueue_async_wait), NULL);
339  if (!promise)
340  return NULL;
341  ev_future_t *future = ev_promise_get_future(promise);
342 
343  struct io_tqueue_async_wait *async_wait = ev_promise_data(promise);
344  async_wait->promise = promise;
345  async_wait->wait = (struct io_tqueue_wait)IO_TQUEUE_WAIT_INIT(
346  value->tv_sec, value->tv_nsec, exec,
347  &io_tqueue_async_wait_func);
348 
349  io_tqueue_submit_wait(tq, &async_wait->wait);
350 
351  if (pwait)
352  *pwait = &async_wait->wait;
353 
354  return future;
355 }
356 
357 struct io_tqueue_wait *
359 {
360  return task ? structof(task, struct io_tqueue_wait, task) : NULL;
361 }
362 
363 static io_ctx_t *
364 io_tqueue_dev_get_ctx(const io_dev_t *dev)
365 {
366  const io_tqueue_t *tq = io_tqueue_from_dev(dev);
367 
368  return tq->ctx;
369 }
370 
371 static ev_exec_t *
372 io_tqueue_dev_get_exec(const io_dev_t *dev)
373 {
374  const io_tqueue_t *tq = io_tqueue_from_dev(dev);
375 
376  return tq->exec;
377 }
378 
379 static size_t
380 io_tqueue_dev_cancel(io_dev_t *dev, struct ev_task *task)
381 {
382  io_tqueue_t *tq = io_tqueue_from_dev(dev);
383 
384  struct sllist queue;
385  sllist_init(&queue);
386 
387  io_tqueue_pop(tq, &queue, task);
388 
389  return io_tqueue_wait_queue_post(&queue, errnum2c(ERRNUM_CANCELED));
390 }
391 
392 static size_t
393 io_tqueue_dev_abort(io_dev_t *dev, struct ev_task *task)
394 {
395  io_tqueue_t *tq = io_tqueue_from_dev(dev);
396 
397  struct sllist queue;
398  sllist_init(&queue);
399 
400  io_tqueue_pop(tq, &queue, task);
401 
402  return ev_task_queue_abort(&queue);
403 }
404 
405 static void
406 io_tqueue_svc_shutdown(struct io_svc *svc)
407 {
408  io_tqueue_t *tq = io_tqueue_from_svc(svc);
409  io_dev_t *dev = &tq->dev_vptr;
410 
411 #if !LELY_NO_THREADS
412  mtx_lock(&tq->mtx);
413 #endif
414  int shutdown = !tq->shutdown;
415  tq->shutdown = 1;
416  // Abort io_tqueue_wait_func().
417  if (shutdown && tq->submitted
418  && io_timer_abort_wait(tq->timer, &tq->wait))
419  tq->submitted = 0;
420 #if !LELY_NO_THREADS
421  mtx_unlock(&tq->mtx);
422 #endif
423 
424  if (shutdown)
425  // Cancel all pending operations.
426  io_tqueue_dev_cancel(dev, NULL);
427 }
428 
429 static void
430 io_tqueue_wait_func(struct ev_task *task)
431 {
433  assert(wait);
434  io_tqueue_t *tq = structof(wait, io_tqueue_t, wait);
435 
436  int errsv = get_errc();
437  int errc = 0;
438  struct sllist queue;
439  sllist_init(&queue);
440 
441  struct timespec now = { 0, 0 };
442  if (io_clock_gettime(io_timer_get_clock(tq->timer), &now) == -1) {
443  errc = get_errc();
444 #if !LELY_NO_THREADS
445  mtx_lock(&tq->mtx);
446 #endif
447  assert(tq->submitted);
448  tq->submitted = 0;
449  tq->next = (struct timespec){ 0, 0 };
450 
451  io_tqueue_do_pop_wait(tq, &queue, NULL);
452 #if !LELY_NO_THREADS
453  mtx_unlock(&tq->mtx);
454 #endif
455  } else {
456 #if !LELY_NO_THREADS
457  mtx_lock(&tq->mtx);
458 #endif
459  assert(tq->submitted);
460  tq->submitted = 0;
461  tq->next = (struct timespec){ 0, 0 };
462 
463  struct pnode *node;
464  while ((node = pheap_first(&tq->queue))) {
465  struct io_tqueue_wait *wait = structof(
466  node, struct io_tqueue_wait, _node);
467  if (timespec_cmp(&now, &wait->value) < 0)
468  break;
469  pheap_remove(&tq->queue, node);
470  wait->task._data = NULL;
471  sllist_push_back(&queue, &wait->task._node);
472  }
473 
474  if (node) {
475  struct io_tqueue_wait *wait = structof(
476  node, struct io_tqueue_wait, _node);
477  tq->next = wait->value;
478  struct itimerspec value = { { 0, 0 }, tq->next };
479  // clang-format off
480  if (io_timer_settime(tq->timer, TIMER_ABSTIME, &value,
481  NULL) == -1) {
482  // clang-format on
483  tq->next = (struct timespec){ 0, 0 };
484  errc = get_errc();
485  io_tqueue_do_pop_wait(tq, &queue, NULL);
486  }
487  }
488 
489  int submit = tq->submitted =
490  !pheap_empty(&tq->queue) && !tq->shutdown;
491 #if !LELY_NO_THREADS
492  mtx_unlock(&tq->mtx);
493 #endif
494  if (submit)
495  io_timer_submit_wait(tq->timer, &tq->wait);
496  }
497 
498  io_tqueue_wait_queue_post(&queue, errc);
499 
500  set_errc(errsv);
501 }
502 
503 static inline io_tqueue_t *
504 io_tqueue_from_dev(const io_dev_t *dev)
505 {
506  assert(dev);
507 
508  return structof(dev, io_tqueue_t, dev_vptr);
509 }
510 
511 static inline io_tqueue_t *
512 io_tqueue_from_svc(const struct io_svc *svc)
513 {
514  assert(svc);
515 
516  return structof(svc, io_tqueue_t, svc);
517 }
518 
519 static void
520 io_tqueue_pop(io_tqueue_t *tq, struct sllist *queue, struct ev_task *task)
521 {
522  assert(tq);
523  assert(queue);
524 
525 #if !LELY_NO_THREADS
526  mtx_lock(&tq->mtx);
527 #endif
528  io_tqueue_do_pop_wait(tq, queue, io_tqueue_wait_from_task(task));
529 #if !LELY_NO_THREADS
530  mtx_unlock(&tq->mtx);
531 #endif
532 }
533 
534 static void
535 io_tqueue_do_pop_wait(io_tqueue_t *tq, struct sllist *queue,
536  struct io_tqueue_wait *wait)
537 {
538  assert(tq);
539  assert(queue);
540 
541  if (!wait) {
542  struct pnode *node;
543  while ((node = pheap_first(&tq->queue))) {
544  pheap_remove(&tq->queue, node);
545  struct io_tqueue_wait *wait = structof(
546  node, struct io_tqueue_wait, _node);
547  wait->task._data = NULL;
548  sllist_push_back(queue, &wait->task._node);
549  }
550  } else if (wait->task._data == &tq->queue) {
551  pheap_remove(&tq->queue, &wait->_node);
552  wait->task._data = NULL;
553  sllist_push_back(queue, &wait->task._node);
554  }
555 
556  if (pheap_empty(&tq->queue) && tq->submitted)
557  io_timer_cancel_wait(tq->timer, &tq->wait);
558 }
559 
560 static inline void
561 io_tqueue_wait_post(struct io_tqueue_wait *wait, int errc)
562 {
563  wait->task._data = NULL;
564  wait->errc = errc;
565 
566  ev_exec_t *exec = wait->task.exec;
567  ev_exec_post(exec, &wait->task);
568  ev_exec_on_task_fini(exec);
569 }
570 
571 static inline size_t
572 io_tqueue_wait_queue_post(struct sllist *queue, int errc)
573 {
574  size_t n = 0;
575 
576  struct slnode *node;
577  while ((node = sllist_pop_front(queue))) {
578  struct ev_task *task = ev_task_from_node(node);
580  io_tqueue_wait_post(wait, errc);
581  n += n < SIZE_MAX;
582  }
583 
584  return n;
585 }
586 
587 static void
588 io_tqueue_async_wait_func(struct ev_task *task)
589 {
590  assert(task);
592  struct io_tqueue_async_wait *async_wait =
593  structof(wait, struct io_tqueue_async_wait, wait);
594 
595  ev_promise_set(async_wait->promise, &wait->errc);
596  ev_promise_release(async_wait->promise);
597 }
598 
599 #endif // !LELY_NO_MALLOC
int io_clock_gettime(const io_clock_t *clock, struct timespec *tp)
Obtains the current time value of the specified clock.
Definition: clock.h:96
This header file is part of the I/O library; it contains the I/O context and service declarations.
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:126
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:141
This header file is part of the utilities library; it contains the native and platform-independent er...
int errnum2c(errnum_t errnum)
Transforms a platform-independent error number to a native error code.
Definition: errnum.c:810
@ ERRNUM_CANCELED
Operation canceled.
Definition: errnum.h:99
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition: errnum.c:932
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:944
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:46
This header file is part of the event library; it contains the abstract task executor interface.
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:136
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:124
void ev_exec_on_task_fini(ev_exec_t *exec)
Undoes the effect of a previous call to ev_exec_on_task_init().
Definition: exec.h:112
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:106
void ev_promise_release(ev_promise_t *promise)
Releases a reference to a promise.
Definition: future.c:198
int ev_promise_set(ev_promise_t *promise, void *value)
Satiesfies a promise, if it was not aready satisfied, and stores the specified value for retrieval by...
Definition: future.c:242
ev_future_t * ev_promise_get_future(ev_promise_t *promise)
Returns (a reference to) a future associated with the specified promise.
Definition: future.c:312
ev_promise_t * ev_promise_create(size_t size, ev_promise_dtor_t *dtor)
Constructs a new promise with an optional empty shared state.
Definition: future.c:154
void * ev_promise_data(const ev_promise_t *promise)
Returns a pointer to the shared state of a promise.
Definition: future.c:236
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
int io_timer_settime(io_timer_t *timer, int flags, const struct itimerspec *value, struct itimerspec *ovalue)
Arms or disarms an I/O timer.
Definition: timer.h:256
const struct io_timer_vtbl *const io_timer_t
An abstract timer.
Definition: timer.h:38
struct io_timer_wait * io_timer_wait_from_task(struct ev_task *task)
Obtains a pointer to an I/O timer wait operation from a pointer to its completion task.
Definition: timer.c:62
static size_t io_timer_abort_wait(io_timer_t *timer, struct io_timer_wait *wait)
Aborts the specified I/O timer wait operation if it is pending.
Definition: timer.h:275
static size_t io_timer_cancel_wait(io_timer_t *timer, struct io_timer_wait *wait)
Cancels the specified I/O timer wait operation if it is pending.
Definition: timer.h:269
io_clock_t * io_timer_get_clock(const io_timer_t *timer)
Returns a pointer to the clock used by the timer.
Definition: timer.h:238
io_dev_t * io_timer_get_dev(const io_timer_t *timer)
Returns a pointer to the abstract I/O device representing the timer.
Definition: timer.h:232
void io_timer_submit_wait(io_timer_t *timer, struct io_timer_wait *wait)
Submits a wait operation to an I/O timer.
Definition: timer.h:263
#define IO_TIMER_WAIT_INIT(exec, func)
The static initializer for io_timer_wait.
Definition: timer.h:65
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
const struct io_dev_vtbl *const io_dev_t
An abstract I/O device.
Definition: dev.h:35
io_ctx_t * io_dev_get_ctx(const io_dev_t *dev)
Returns a pointer to the I/O context with which the I/O device is registered.
Definition: dev.h:80
ev_exec_t * io_dev_get_exec(const io_dev_t *dev)
Returns a pointer to the executor used by the I/O device to execute asynchronous tasks.
Definition: dev.h:86
struct pnode * pheap_first(const struct pheap *heap)
Returns a pointer to the first (minimum) node in a pairing heap.
Definition: pheap.h:254
void pheap_insert(struct pheap *heap, struct pnode *node)
Inserts a node into a pairing heap.
Definition: pheap.c:35
void pheap_init(struct pheap *heap, pheap_cmp_t *cmp)
Initializes a pairing heap.
Definition: pheap.h:229
void pheap_remove(struct pheap *heap, struct pnode *node)
Removes a node from a pairing heap.
Definition: pheap.c:63
int pheap_empty(const struct pheap *heap)
Returns 1 if the pairing heap is empty, and 0 if not.
Definition: pheap.h:240
void pnode_init(struct pnode *node, const void *key)
Initializes a node in a paring heap.
Definition: pheap.h:207
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:194
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:232
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:243
This is the internal header file of the I/O library.
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
A future.
Definition: future.c:66
A promise.
Definition: future.c:95
An executable task.
Definition: task.h:41
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
Definition: ctx.c:38
The virtual table of an I/O service.
Definition: ctx.h:67
An I/O service.
Definition: ctx.h:49
A wait operation suitable for use with an I/O timer.
Definition: timer.h:54
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the wait operation.
Definition: timer.h:59
A wait operation suitable for use with a timer queue.
Definition: tqueue.h:36
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the wait operation.
Definition: tqueue.h:43
int errc
The error number, obtained as if by get_errc(), if an error occurred or the operation was canceled.
Definition: tqueue.h:48
struct timespec value
The absolute expiration time.
Definition: tqueue.h:38
A pairing heap.
Definition: pheap.h:87
A node in a pairing heap.
Definition: pheap.h:52
A singly-linked list.
Definition: sllist.h:52
A node in a singly-linked list.
Definition: sllist.h:40
size_t ev_task_queue_abort(struct sllist *queue)
Aborts the tasks in queue by invoking ev_exec_on_task_fini() for each of them.
Definition: task.c:55
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
void thrd_yield(void)
Endeavors to permit other threads to run, even if the current thread would ordinarily continue to run...
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109
io_timer_t * io_tqueue_get_timer(const io_tqueue_t *tq)
Returns a pointer to the I/O timer used by the timer queue.
Definition: tqueue.c:228
void io_tqueue_destroy(io_tqueue_t *tq)
Destroys a timer queue.
Definition: tqueue.c:211
size_t io_tqueue_abort_wait(io_tqueue_t *tq, struct io_tqueue_wait *wait)
Aborts the specified timer queue wait operation if it is pending.
Definition: tqueue.c:313
size_t io_tqueue_cancel_wait(io_tqueue_t *tq, struct io_tqueue_wait *wait)
Cancels the specified timer queue wait operation if it is pending.
Definition: tqueue.c:294
ev_future_t * io_tqueue_async_wait(io_tqueue_t *tq, ev_exec_t *exec, const struct timespec *value, struct io_tqueue_wait **pwait)
Submits an asynchronous wait operation to a timer queue and creates a future which becomes ready once...
Definition: tqueue.c:332
struct io_tqueue_wait * io_tqueue_wait_from_task(struct ev_task *task)
Obtains a pointer to a timer queue wait operation from a pointer to its completion task.
Definition: tqueue.c:358
void io_tqueue_submit_wait(io_tqueue_t *tq, struct io_tqueue_wait *wait)
Submits a wait operation to a timer queue.
Definition: tqueue.c:236
io_tqueue_t * io_tqueue_create(io_timer_t *timer, ev_exec_t *exec)
Creates a new timer queue.
Definition: tqueue.c:184
io_dev_t * io_tqueue_get_dev(const io_tqueue_t *tq)
Returns a pointer to the abstract I/O device representing the timer queue.
Definition: tqueue.c:220
This header file is part of the I/O library; it contains the timer queue declarations.
#define IO_TQUEUE_WAIT_INIT(sec, nsec, exec, func)
The static initializer for io_tqueue_wait.
Definition: tqueue.h:53
This header file is part of the utilities library; it contains the time function declarations.
int timespec_cmp(const void *p1, const void *p2)
Compares two times.
Definition: time.h:251