Lely core libraries 2.3.4
tqueue.c
Go to the documentation of this file.
1
24#include "io2.h"
25
26#if !LELY_NO_MALLOC
27
28#if !LELY_NO_THREADS
29#include <lely/libc/threads.h>
30#endif
31#include <lely/ev/exec.h>
32#include <lely/io2/ctx.h>
33#include <lely/io2/tqueue.h>
34#include <lely/util/errnum.h>
35#include <lely/util/time.h>
36#include <lely/util/util.h>
37
38#include <assert.h>
39#include <stdlib.h>
40
41static io_ctx_t *io_tqueue_dev_get_ctx(const io_dev_t *dev);
42static ev_exec_t *io_tqueue_dev_get_exec(const io_dev_t *dev);
43static size_t io_tqueue_dev_cancel(io_dev_t *dev, struct ev_task *task);
44static size_t io_tqueue_dev_abort(io_dev_t *dev, struct ev_task *task);
45
46// clang-format off
47static const struct io_dev_vtbl io_tqueue_dev_vtbl = {
48 &io_tqueue_dev_get_ctx,
49 &io_tqueue_dev_get_exec,
50 &io_tqueue_dev_cancel,
51 &io_tqueue_dev_abort
52};
53// clang-format on
54
55static void io_tqueue_svc_shutdown(struct io_svc *svc);
56
57// clang-format off
58static const struct io_svc_vtbl io_tqueue_svc_vtbl = {
59 NULL,
60 &io_tqueue_svc_shutdown
61};
62// clang-format on
63
64struct io_tqueue {
65 const struct io_dev_vtbl *dev_vptr;
66 struct io_svc svc;
67 io_ctx_t *ctx;
68 io_timer_t *timer;
69 ev_exec_t *exec;
70 struct io_timer_wait wait;
71#if !LELY_NO_THREADS
72 mtx_t mtx;
73#endif
74 unsigned shutdown : 1;
75 unsigned submitted : 1;
76 struct timespec next;
77 struct pheap queue;
78};
79
80static void io_tqueue_wait_func(struct ev_task *task);
81
82static inline io_tqueue_t *io_tqueue_from_dev(const io_dev_t *dev);
83static inline io_tqueue_t *io_tqueue_from_svc(const struct io_svc *svc);
84
85static void io_tqueue_pop(
86 io_tqueue_t *tq, struct sllist *queue, struct ev_task *task);
87static void io_tqueue_do_pop_wait(io_tqueue_t *tq, struct sllist *queue,
88 struct io_tqueue_wait *wait);
89
90static inline void io_tqueue_wait_post(struct io_tqueue_wait *wait, int errc);
91static inline size_t io_tqueue_wait_queue_post(struct sllist *queue, int errc);
92
94 ev_promise_t *promise;
95 struct io_tqueue_wait wait;
96};
97
98static void io_tqueue_async_wait_func(struct ev_task *task);
99
100void *
101io_tqueue_alloc(void)
102{
103 void *ptr = malloc(sizeof(io_tqueue_t));
104#if !LELY_NO_ERRNO
105 if (!ptr)
106 set_errc(errno2c(errno));
107#endif
108 return ptr;
109}
110
111void
112io_tqueue_free(void *ptr)
113{
114 free(ptr);
115}
116
118io_tqueue_init(io_tqueue_t *tq, io_timer_t *timer, ev_exec_t *exec)
119{
120 assert(tq);
121 assert(timer);
122 io_dev_t *dev = io_timer_get_dev(timer);
123 assert(dev);
124 io_ctx_t *ctx = io_dev_get_ctx(dev);
125 assert(ctx);
126 if (!exec)
127 exec = io_dev_get_exec(dev);
128 assert(exec);
129
130 tq->dev_vptr = &io_tqueue_dev_vtbl;
131
132 tq->svc = (struct io_svc)IO_SVC_INIT(&io_tqueue_svc_vtbl);
133 tq->ctx = ctx;
134
135 tq->timer = timer;
136
137 tq->exec = exec;
138
139 tq->wait = (struct io_timer_wait)IO_TIMER_WAIT_INIT(
140 tq->exec, &io_tqueue_wait_func);
141
142#if !LELY_NO_THREADS
143 if (mtx_init(&tq->mtx, mtx_plain) != thrd_success)
144 return NULL;
145#endif
146
147 tq->shutdown = 0;
148 tq->submitted = 0;
149
150 tq->next = (struct timespec){ 0, 0 };
151
152 pheap_init(&tq->queue, &timespec_cmp);
153
154 io_ctx_insert(tq->ctx, &tq->svc);
155
156 return tq;
157}
158
159void
160io_tqueue_fini(io_tqueue_t *tq)
161{
162 assert(tq);
163
164 io_ctx_remove(tq->ctx, &tq->svc);
165 // Cancel all pending operations.
166 io_tqueue_svc_shutdown(&tq->svc);
167
168#if !LELY_NO_THREADS
169 mtx_lock(&tq->mtx);
170 // If necessary, busy-wait until If io_tqueue_wait_func() completes.
171 while (tq->submitted
172 && !ev_exec_abort(tq->wait.task.exec, &tq->wait.task)) {
173 mtx_unlock(&tq->mtx);
174 thrd_yield();
175 mtx_lock(&tq->mtx);
176 }
177 mtx_unlock(&tq->mtx);
178
179 mtx_destroy(&tq->mtx);
180#endif
181}
182
185{
186 int errc = 0;
187
188 io_tqueue_t *tq = io_tqueue_alloc();
189 if (!tq) {
190 errc = get_errc();
191 goto error_alloc;
192 }
193
194 io_tqueue_t *tmp = io_tqueue_init(tq, timer, exec);
195 if (!tmp) {
196 errc = get_errc();
197 goto error_init;
198 }
199 tq = tmp;
200
201 return tq;
202
203error_init:
204 io_tqueue_free(tq);
205error_alloc:
206 set_errc(errc);
207 return NULL;
208}
209
210void
212{
213 if (tq) {
214 io_tqueue_fini(tq);
215 io_tqueue_free(tq);
216 }
217}
218
219io_dev_t *
221{
222 assert(tq);
223
224 return &tq->dev_vptr;
225}
226
229{
230 assert(tq);
231
232 return tq->timer;
233}
234
235void
237{
238 assert(tq);
239 assert(wait);
240 struct ev_task *task = &wait->task;
241
242 if (!task->exec)
243 task->exec = tq->exec;
245
246#if !LELY_NO_THREADS
247 mtx_lock(&tq->mtx);
248#endif
249 if (tq->shutdown) {
250#if !LELY_NO_THREADS
251 mtx_unlock(&tq->mtx);
252#endif
253 io_tqueue_wait_post(wait, errnum2c(ERRNUM_CANCELED));
254 } else {
255 int errsv = get_errc();
256 int errc = 0;
257 struct sllist queue;
258 sllist_init(&queue);
259
260 task->_data = &tq->queue;
261 pnode_init(&wait->_node, &wait->value);
262 pheap_insert(&tq->queue, &wait->_node);
263
264 if ((!tq->next.tv_sec && !tq->next.tv_nsec)
265 || timespec_cmp(&wait->value, &tq->next) < 0) {
266 tq->next = wait->value;
267 struct itimerspec value = { { 0, 0 }, tq->next };
268 // clang-format off
269 if (io_timer_settime(tq->timer, TIMER_ABSTIME, &value,
270 NULL) == -1) {
271 // clang-format on
272 tq->next = (struct timespec){ 0, 0 };
273 errc = get_errc();
274 io_tqueue_do_pop_wait(tq, &queue, NULL);
275 }
276 }
277
278 int submit = !tq->submitted && !pheap_empty(&tq->queue);
279 if (submit)
280 tq->submitted = 1;
281#if !LELY_NO_THREADS
282 mtx_unlock(&tq->mtx);
283#endif
284 // cppcheck-suppress duplicateCondition
285 if (submit)
286 io_timer_submit_wait(tq->timer, &tq->wait);
287
288 io_tqueue_wait_queue_post(&queue, errc);
289 set_errc(errsv);
290 }
291}
292
293size_t
295{
296 assert(tq);
297
298 struct sllist queue;
299 sllist_init(&queue);
300
301#if !LELY_NO_THREADS
302 mtx_lock(&tq->mtx);
303#endif
304 io_tqueue_do_pop_wait(tq, &queue, wait);
305#if !LELY_NO_THREADS
306 mtx_unlock(&tq->mtx);
307#endif
308
309 return io_tqueue_wait_queue_post(&queue, errnum2c(ERRNUM_CANCELED));
310}
311
312size_t
314{
315 assert(tq);
316
317 struct sllist queue;
318 sllist_init(&queue);
319
320#if !LELY_NO_THREADS
321 mtx_lock(&tq->mtx);
322#endif
323 io_tqueue_do_pop_wait(tq, &queue, wait);
324#if !LELY_NO_THREADS
325 mtx_unlock(&tq->mtx);
326#endif
327
328 return ev_task_queue_abort(&queue);
329}
330
333 const struct timespec *value, struct io_tqueue_wait **pwait)
334{
335 assert(value);
336
338 sizeof(struct io_tqueue_async_wait), NULL);
339 if (!promise)
340 return NULL;
341 ev_future_t *future = ev_promise_get_future(promise);
342
343 struct io_tqueue_async_wait *async_wait = ev_promise_data(promise);
344 async_wait->promise = promise;
345 async_wait->wait = (struct io_tqueue_wait)IO_TQUEUE_WAIT_INIT(
346 value->tv_sec, value->tv_nsec, exec,
347 &io_tqueue_async_wait_func);
348
349 io_tqueue_submit_wait(tq, &async_wait->wait);
350
351 if (pwait)
352 *pwait = &async_wait->wait;
353
354 return future;
355}
356
357struct io_tqueue_wait *
359{
360 return task ? structof(task, struct io_tqueue_wait, task) : NULL;
361}
362
363static io_ctx_t *
364io_tqueue_dev_get_ctx(const io_dev_t *dev)
365{
366 const io_tqueue_t *tq = io_tqueue_from_dev(dev);
367
368 return tq->ctx;
369}
370
371static ev_exec_t *
372io_tqueue_dev_get_exec(const io_dev_t *dev)
373{
374 const io_tqueue_t *tq = io_tqueue_from_dev(dev);
375
376 return tq->exec;
377}
378
379static size_t
380io_tqueue_dev_cancel(io_dev_t *dev, struct ev_task *task)
381{
382 io_tqueue_t *tq = io_tqueue_from_dev(dev);
383
384 struct sllist queue;
385 sllist_init(&queue);
386
387 io_tqueue_pop(tq, &queue, task);
388
389 return io_tqueue_wait_queue_post(&queue, errnum2c(ERRNUM_CANCELED));
390}
391
392static size_t
393io_tqueue_dev_abort(io_dev_t *dev, struct ev_task *task)
394{
395 io_tqueue_t *tq = io_tqueue_from_dev(dev);
396
397 struct sllist queue;
398 sllist_init(&queue);
399
400 io_tqueue_pop(tq, &queue, task);
401
402 return ev_task_queue_abort(&queue);
403}
404
405static void
406io_tqueue_svc_shutdown(struct io_svc *svc)
407{
408 io_tqueue_t *tq = io_tqueue_from_svc(svc);
409 io_dev_t *dev = &tq->dev_vptr;
410
411#if !LELY_NO_THREADS
412 mtx_lock(&tq->mtx);
413#endif
414 int shutdown = !tq->shutdown;
415 tq->shutdown = 1;
416 // Abort io_tqueue_wait_func().
417 if (shutdown && tq->submitted
418 && io_timer_abort_wait(tq->timer, &tq->wait))
419 tq->submitted = 0;
420#if !LELY_NO_THREADS
421 mtx_unlock(&tq->mtx);
422#endif
423
424 if (shutdown)
425 // Cancel all pending operations.
426 io_tqueue_dev_cancel(dev, NULL);
427}
428
429static void
430io_tqueue_wait_func(struct ev_task *task)
431{
433 assert(wait);
434 io_tqueue_t *tq = structof(wait, io_tqueue_t, wait);
435
436 int errsv = get_errc();
437 int errc = 0;
438 struct sllist queue;
439 sllist_init(&queue);
440
441 struct timespec now = { 0, 0 };
442 if (io_clock_gettime(io_timer_get_clock(tq->timer), &now) == -1) {
443 errc = get_errc();
444#if !LELY_NO_THREADS
445 mtx_lock(&tq->mtx);
446#endif
447 assert(tq->submitted);
448 tq->submitted = 0;
449 tq->next = (struct timespec){ 0, 0 };
450
451 io_tqueue_do_pop_wait(tq, &queue, NULL);
452#if !LELY_NO_THREADS
453 mtx_unlock(&tq->mtx);
454#endif
455 } else {
456#if !LELY_NO_THREADS
457 mtx_lock(&tq->mtx);
458#endif
459 assert(tq->submitted);
460 tq->submitted = 0;
461 tq->next = (struct timespec){ 0, 0 };
462
463 struct pnode *node;
464 while ((node = pheap_first(&tq->queue))) {
465 struct io_tqueue_wait *wait = structof(
466 node, struct io_tqueue_wait, _node);
467 if (timespec_cmp(&now, &wait->value) < 0)
468 break;
469 pheap_remove(&tq->queue, node);
470 wait->task._data = NULL;
471 sllist_push_back(&queue, &wait->task._node);
472 }
473
474 if (node) {
475 struct io_tqueue_wait *wait = structof(
476 node, struct io_tqueue_wait, _node);
477 tq->next = wait->value;
478 struct itimerspec value = { { 0, 0 }, tq->next };
479 // clang-format off
480 if (io_timer_settime(tq->timer, TIMER_ABSTIME, &value,
481 NULL) == -1) {
482 // clang-format on
483 tq->next = (struct timespec){ 0, 0 };
484 errc = get_errc();
485 io_tqueue_do_pop_wait(tq, &queue, NULL);
486 }
487 }
488
489 int submit = tq->submitted =
490 !pheap_empty(&tq->queue) && !tq->shutdown;
491#if !LELY_NO_THREADS
492 mtx_unlock(&tq->mtx);
493#endif
494 if (submit)
495 io_timer_submit_wait(tq->timer, &tq->wait);
496 }
497
498 io_tqueue_wait_queue_post(&queue, errc);
499
500 set_errc(errsv);
501}
502
503static inline io_tqueue_t *
504io_tqueue_from_dev(const io_dev_t *dev)
505{
506 assert(dev);
507
508 return structof(dev, io_tqueue_t, dev_vptr);
509}
510
511static inline io_tqueue_t *
512io_tqueue_from_svc(const struct io_svc *svc)
513{
514 assert(svc);
515
516 return structof(svc, io_tqueue_t, svc);
517}
518
519static void
520io_tqueue_pop(io_tqueue_t *tq, struct sllist *queue, struct ev_task *task)
521{
522 assert(tq);
523 assert(queue);
524
525#if !LELY_NO_THREADS
526 mtx_lock(&tq->mtx);
527#endif
528 io_tqueue_do_pop_wait(tq, queue, io_tqueue_wait_from_task(task));
529#if !LELY_NO_THREADS
530 mtx_unlock(&tq->mtx);
531#endif
532}
533
534static void
535io_tqueue_do_pop_wait(io_tqueue_t *tq, struct sllist *queue,
536 struct io_tqueue_wait *wait)
537{
538 assert(tq);
539 assert(queue);
540
541 if (!wait) {
542 struct pnode *node;
543 while ((node = pheap_first(&tq->queue))) {
544 pheap_remove(&tq->queue, node);
545 struct io_tqueue_wait *wait = structof(
546 node, struct io_tqueue_wait, _node);
547 wait->task._data = NULL;
548 sllist_push_back(queue, &wait->task._node);
549 }
550 } else if (wait->task._data == &tq->queue) {
551 pheap_remove(&tq->queue, &wait->_node);
552 wait->task._data = NULL;
553 sllist_push_back(queue, &wait->task._node);
554 }
555
556 if (pheap_empty(&tq->queue) && tq->submitted)
557 io_timer_cancel_wait(tq->timer, &tq->wait);
558}
559
560static inline void
561io_tqueue_wait_post(struct io_tqueue_wait *wait, int errc)
562{
563 wait->task._data = NULL;
564 wait->errc = errc;
565
566 ev_exec_t *exec = wait->task.exec;
567 ev_exec_post(exec, &wait->task);
569}
570
571static inline size_t
572io_tqueue_wait_queue_post(struct sllist *queue, int errc)
573{
574 size_t n = 0;
575
576 struct slnode *node;
577 while ((node = sllist_pop_front(queue))) {
578 struct ev_task *task = ev_task_from_node(node);
580 io_tqueue_wait_post(wait, errc);
581 n += n < SIZE_MAX;
582 }
583
584 return n;
585}
586
587static void
588io_tqueue_async_wait_func(struct ev_task *task)
589{
590 assert(task);
592 struct io_tqueue_async_wait *async_wait =
593 structof(wait, struct io_tqueue_async_wait, wait);
594
595 ev_promise_set(async_wait->promise, &wait->errc);
596 ev_promise_release(async_wait->promise);
597}
598
599#endif // !LELY_NO_MALLOC
int io_clock_gettime(const io_clock_t *clock, struct timespec *tp)
Obtains the current time value of the specified clock.
Definition: clock.h:96
This header file is part of the I/O library; it contains the I/O context and service declarations.
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:126
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:141
This header file is part of the utilities library; it contains the native and platform-independent er...
int errnum2c(errnum_t errnum)
Transforms a platform-independent error number to a native error code.
Definition: errnum.c:810
@ ERRNUM_CANCELED
Operation canceled.
Definition: errnum.h:99
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition: errnum.c:932
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:944
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:46
This header file is part of the event library; it contains the abstract task executor interface.
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:136
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:124
void ev_exec_on_task_fini(ev_exec_t *exec)
Undoes the effect of a previous call to ev_exec_on_task_init().
Definition: exec.h:112
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:106
void ev_promise_release(ev_promise_t *promise)
Releases a reference to a promise.
Definition: future.c:198
ev_future_t * ev_promise_get_future(ev_promise_t *promise)
Returns (a reference to) a future associated with the specified promise.
Definition: future.c:312
int ev_promise_set(ev_promise_t *promise, void *value)
Satiesfies a promise, if it was not aready satisfied, and stores the specified value for retrieval by...
Definition: future.c:242
void * ev_promise_data(const ev_promise_t *promise)
Returns a pointer to the shared state of a promise.
Definition: future.c:236
ev_promise_t * ev_promise_create(size_t size, ev_promise_dtor_t *dtor)
Constructs a new promise with an optional empty shared state.
Definition: future.c:154
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
int io_timer_settime(io_timer_t *timer, int flags, const struct itimerspec *value, struct itimerspec *ovalue)
Arms or disarms an I/O timer.
Definition: timer.h:256
const struct io_timer_vtbl *const io_timer_t
An abstract timer.
Definition: timer.h:38
static size_t io_timer_abort_wait(io_timer_t *timer, struct io_timer_wait *wait)
Aborts the specified I/O timer wait operation if it is pending.
Definition: timer.h:275
static size_t io_timer_cancel_wait(io_timer_t *timer, struct io_timer_wait *wait)
Cancels the specified I/O timer wait operation if it is pending.
Definition: timer.h:269
io_clock_t * io_timer_get_clock(const io_timer_t *timer)
Returns a pointer to the clock used by the timer.
Definition: timer.h:238
io_dev_t * io_timer_get_dev(const io_timer_t *timer)
Returns a pointer to the abstract I/O device representing the timer.
Definition: timer.h:232
void io_timer_submit_wait(io_timer_t *timer, struct io_timer_wait *wait)
Submits a wait operation to an I/O timer.
Definition: timer.h:263
struct io_timer_wait * io_timer_wait_from_task(struct ev_task *task)
Obtains a pointer to an I/O timer wait operation from a pointer to its completion task.
Definition: timer.c:62
#define IO_TIMER_WAIT_INIT(exec, func)
The static initializer for io_timer_wait.
Definition: timer.h:65
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
const struct io_dev_vtbl *const io_dev_t
An abstract I/O device.
Definition: dev.h:35
io_ctx_t * io_dev_get_ctx(const io_dev_t *dev)
Returns a pointer to the I/O context with which the I/O device is registered.
Definition: dev.h:80
ev_exec_t * io_dev_get_exec(const io_dev_t *dev)
Returns a pointer to the executor used by the I/O device to execute asynchronous tasks.
Definition: dev.h:86
struct pnode * pheap_first(const struct pheap *heap)
Returns a pointer to the first (minimum) node in a pairing heap.
Definition: pheap.h:254
void pheap_insert(struct pheap *heap, struct pnode *node)
Inserts a node into a pairing heap.
Definition: pheap.c:35
void pheap_init(struct pheap *heap, pheap_cmp_t *cmp)
Initializes a pairing heap.
Definition: pheap.h:229
void pheap_remove(struct pheap *heap, struct pnode *node)
Removes a node from a pairing heap.
Definition: pheap.c:63
int pheap_empty(const struct pheap *heap)
Returns 1 if the pairing heap is empty, and 0 if not.
Definition: pheap.h:240
void pnode_init(struct pnode *node, const void *key)
Initializes a node in a paring heap.
Definition: pheap.h:207
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:194
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:232
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:243
This is the internal header file of the I/O library.
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
A future.
Definition: future.c:66
A promise.
Definition: future.c:95
An executable task.
Definition: task.h:41
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
Definition: ctx.c:38
The virtual table of an I/O service.
Definition: ctx.h:67
An I/O service.
Definition: ctx.h:49
A wait operation suitable for use with an I/O timer.
Definition: timer.h:54
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the wait operation.
Definition: timer.h:59
A wait operation suitable for use with a timer queue.
Definition: tqueue.h:36
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the wait operation.
Definition: tqueue.h:43
int errc
The error number, obtained as if by get_errc(), if an error occurred or the operation was canceled.
Definition: tqueue.h:48
struct timespec value
The absolute expiration time.
Definition: tqueue.h:38
A pairing heap.
Definition: pheap.h:87
A node in a pairing heap.
Definition: pheap.h:52
A singly-linked list.
Definition: sllist.h:52
A node in a singly-linked list.
Definition: sllist.h:40
size_t ev_task_queue_abort(struct sllist *queue)
Aborts the tasks in queue by invoking ev_exec_on_task_fini() for each of them.
Definition: task.c:55
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
void thrd_yield(void)
Endeavors to permit other threads to run, even if the current thread would ordinarily continue to run...
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109
void io_tqueue_destroy(io_tqueue_t *tq)
Destroys a timer queue.
Definition: tqueue.c:211
struct io_tqueue_wait * io_tqueue_wait_from_task(struct ev_task *task)
Obtains a pointer to a timer queue wait operation from a pointer to its completion task.
Definition: tqueue.c:358
size_t io_tqueue_abort_wait(io_tqueue_t *tq, struct io_tqueue_wait *wait)
Aborts the specified timer queue wait operation if it is pending.
Definition: tqueue.c:313
io_tqueue_t * io_tqueue_create(io_timer_t *timer, ev_exec_t *exec)
Creates a new timer queue.
Definition: tqueue.c:184
size_t io_tqueue_cancel_wait(io_tqueue_t *tq, struct io_tqueue_wait *wait)
Cancels the specified timer queue wait operation if it is pending.
Definition: tqueue.c:294
io_dev_t * io_tqueue_get_dev(const io_tqueue_t *tq)
Returns a pointer to the abstract I/O device representing the timer queue.
Definition: tqueue.c:220
io_timer_t * io_tqueue_get_timer(const io_tqueue_t *tq)
Returns a pointer to the I/O timer used by the timer queue.
Definition: tqueue.c:228
void io_tqueue_submit_wait(io_tqueue_t *tq, struct io_tqueue_wait *wait)
Submits a wait operation to a timer queue.
Definition: tqueue.c:236
ev_future_t * io_tqueue_async_wait(io_tqueue_t *tq, ev_exec_t *exec, const struct timespec *value, struct io_tqueue_wait **pwait)
Submits an asynchronous wait operation to a timer queue and creates a future which becomes ready once...
Definition: tqueue.c:332
This header file is part of the I/O library; it contains the timer queue declarations.
#define IO_TQUEUE_WAIT_INIT(sec, nsec, exec, func)
The static initializer for io_tqueue_wait.
Definition: tqueue.h:53
This header file is part of the utilities library; it contains the time function declarations.
int timespec_cmp(const void *p1, const void *p2)
Compares two times.
Definition: time.h:251