Lely core libraries  2.2.5
tqueue.c
Go to the documentation of this file.
1 
24 #include "io2.h"
25 #if !LELY_NO_THREADS
26 #include <lely/libc/threads.h>
27 #endif
28 #include <lely/ev/exec.h>
29 #include <lely/io2/ctx.h>
30 #include <lely/io2/tqueue.h>
31 #include <lely/util/errnum.h>
32 #include <lely/util/time.h>
33 #include <lely/util/util.h>
34 
35 #include <assert.h>
36 #include <stdlib.h>
37 
38 static io_ctx_t *io_tqueue_dev_get_ctx(const io_dev_t *dev);
39 static ev_exec_t *io_tqueue_dev_get_exec(const io_dev_t *dev);
40 static size_t io_tqueue_dev_cancel(io_dev_t *dev, struct ev_task *task);
41 static size_t io_tqueue_dev_abort(io_dev_t *dev, struct ev_task *task);
42 
43 // clang-format off
44 static const struct io_dev_vtbl io_tqueue_dev_vtbl = {
45  &io_tqueue_dev_get_ctx,
46  &io_tqueue_dev_get_exec,
47  &io_tqueue_dev_cancel,
48  &io_tqueue_dev_abort
49 };
50 // clang-format on
51 
52 static void io_tqueue_svc_shutdown(struct io_svc *svc);
53 
54 // clang-format off
55 static const struct io_svc_vtbl io_tqueue_svc_vtbl = {
56  NULL,
57  &io_tqueue_svc_shutdown
58 };
59 // clang-format on
60 
61 struct io_tqueue {
62  const struct io_dev_vtbl *dev_vptr;
63  struct io_svc svc;
64  io_ctx_t *ctx;
65  io_timer_t *timer;
66  ev_exec_t *exec;
67  struct io_timer_wait wait;
68 #if !LELY_NO_THREADS
69  mtx_t mtx;
70 #endif
71  unsigned shutdown : 1;
72  unsigned submitted : 1;
73  struct timespec next;
74  struct pheap queue;
75 };
76 
77 static void io_tqueue_wait_func(struct ev_task *task);
78 
79 static inline io_tqueue_t *io_tqueue_from_dev(const io_dev_t *dev);
80 static inline io_tqueue_t *io_tqueue_from_svc(const struct io_svc *svc);
81 
82 static void io_tqueue_pop(
83  io_tqueue_t *tq, struct sllist *queue, struct ev_task *task);
84 static void io_tqueue_do_pop_wait(io_tqueue_t *tq, struct sllist *queue,
85  struct io_tqueue_wait *wait);
86 
87 static inline void io_tqueue_wait_post(struct io_tqueue_wait *wait, int errc);
88 static inline size_t io_tqueue_wait_queue_post(struct sllist *queue, int errc);
89 
91  ev_promise_t *promise;
92  struct io_tqueue_wait wait;
93 };
94 
95 static void io_tqueue_async_wait_func(struct ev_task *task);
96 
97 void *
98 io_tqueue_alloc(void)
99 {
100  void *ptr = malloc(sizeof(io_tqueue_t));
101  if (!ptr)
102  set_errc(errno2c(errno));
103  return ptr;
104 }
105 
106 void
107 io_tqueue_free(void *ptr)
108 {
109  free(ptr);
110 }
111 
112 io_tqueue_t *
113 io_tqueue_init(io_tqueue_t *tq, io_timer_t *timer, ev_exec_t *exec)
114 {
115  assert(tq);
116  assert(timer);
117  io_dev_t *dev = io_timer_get_dev(timer);
118  assert(dev);
119  io_ctx_t *ctx = io_dev_get_ctx(dev);
120  assert(ctx);
121  if (!exec)
122  exec = io_dev_get_exec(dev);
123  assert(exec);
124 
125  tq->dev_vptr = &io_tqueue_dev_vtbl;
126 
127  tq->svc = (struct io_svc)IO_SVC_INIT(&io_tqueue_svc_vtbl);
128  tq->ctx = ctx;
129 
130  tq->timer = timer;
131 
132  tq->exec = exec;
133 
134  tq->wait = (struct io_timer_wait)IO_TIMER_WAIT_INIT(
135  tq->exec, &io_tqueue_wait_func);
136 
137 #if !LELY_NO_THREADS
138  if (mtx_init(&tq->mtx, mtx_plain) != thrd_success)
139  return NULL;
140 #endif
141 
142  tq->shutdown = 0;
143  tq->submitted = 0;
144 
145  tq->next = (struct timespec){ 0, 0 };
146 
147  pheap_init(&tq->queue, &timespec_cmp);
148 
149  io_ctx_insert(tq->ctx, &tq->svc);
150 
151  return tq;
152 }
153 
154 void
155 io_tqueue_fini(io_tqueue_t *tq)
156 {
157  assert(tq);
158 
159  io_ctx_remove(tq->ctx, &tq->svc);
160  // Cancel all pending operations.
161  io_tqueue_svc_shutdown(&tq->svc);
162 
163 #if !LELY_NO_THREADS
164  mtx_lock(&tq->mtx);
165  // If necessary, busy-wait until If io_tqueue_wait_func() completes.
166  while (tq->submitted
167  && !ev_exec_abort(tq->wait.task.exec, &tq->wait.task)) {
168  mtx_unlock(&tq->mtx);
169  thrd_yield();
170  mtx_lock(&tq->mtx);
171  }
172  mtx_unlock(&tq->mtx);
173 
174  mtx_destroy(&tq->mtx);
175 #endif
176 }
177 
178 io_tqueue_t *
180 {
181  int errc = 0;
182 
183  io_tqueue_t *tq = io_tqueue_alloc();
184  if (!tq) {
185  errc = get_errc();
186  goto error_alloc;
187  }
188 
189  io_tqueue_t *tmp = io_tqueue_init(tq, timer, exec);
190  if (!tmp) {
191  errc = get_errc();
192  goto error_init;
193  }
194  tq = tmp;
195 
196  return tq;
197 
198 error_init:
199  io_tqueue_free(tq);
200 error_alloc:
201  set_errc(errc);
202  return NULL;
203 }
204 
205 void
207 {
208  if (tq) {
209  io_tqueue_fini(tq);
210  io_tqueue_free(tq);
211  }
212 }
213 
214 io_dev_t *
216 {
217  assert(tq);
218 
219  return &tq->dev_vptr;
220 }
221 
222 io_timer_t *
224 {
225  assert(tq);
226 
227  return tq->timer;
228 }
229 
230 void
232 {
233  assert(tq);
234  assert(wait);
235  struct ev_task *task = &wait->task;
236 
237  if (!task->exec)
238  task->exec = tq->exec;
239  ev_exec_on_task_init(task->exec);
240 
241 #if !LELY_NO_THREADS
242  mtx_lock(&tq->mtx);
243 #endif
244  if (tq->shutdown) {
245 #if !LELY_NO_THREADS
246  mtx_unlock(&tq->mtx);
247 #endif
248  io_tqueue_wait_post(wait, errnum2c(ERRNUM_CANCELED));
249  } else {
250  int errsv = get_errc();
251  int errc = 0;
252  struct sllist queue;
253  sllist_init(&queue);
254 
255  task->_data = &tq->queue;
256  pnode_init(&wait->_node, &wait->value);
257  pheap_insert(&tq->queue, &wait->_node);
258 
259  if ((!tq->next.tv_sec && !tq->next.tv_nsec)
260  || timespec_cmp(&wait->value, &tq->next) < 0) {
261  tq->next = wait->value;
262  struct itimerspec value = { { 0, 0 }, tq->next };
263  // clang-format off
264  if (io_timer_settime(tq->timer, TIMER_ABSTIME, &value,
265  NULL) == -1) {
266  // clang-format on
267  tq->next = (struct timespec){ 0, 0 };
268  errc = get_errc();
269  io_tqueue_do_pop_wait(tq, &queue, NULL);
270  }
271  }
272 
273  int submit = !tq->submitted && !pheap_empty(&tq->queue);
274  if (submit)
275  tq->submitted = 1;
276 #if !LELY_NO_THREADS
277  mtx_unlock(&tq->mtx);
278 #endif
279  if (submit)
280  io_timer_submit_wait(tq->timer, &tq->wait);
281 
282  io_tqueue_wait_queue_post(&queue, errc);
283  set_errc(errsv);
284  }
285 }
286 
287 size_t
289 {
290  assert(tq);
291 
292  struct sllist queue;
293  sllist_init(&queue);
294 
295 #if !LELY_NO_THREADS
296  mtx_lock(&tq->mtx);
297 #endif
298  io_tqueue_do_pop_wait(tq, &queue, wait);
299 #if !LELY_NO_THREADS
300  mtx_unlock(&tq->mtx);
301 #endif
302 
303  return io_tqueue_wait_queue_post(&queue, errnum2c(ERRNUM_CANCELED));
304 }
305 
306 size_t
308 {
309  assert(tq);
310 
311  struct sllist queue;
312  sllist_init(&queue);
313 
314 #if !LELY_NO_THREADS
315  mtx_lock(&tq->mtx);
316 #endif
317  io_tqueue_do_pop_wait(tq, &queue, wait);
318 #if !LELY_NO_THREADS
319  mtx_unlock(&tq->mtx);
320 #endif
321 
322  return ev_task_queue_abort(&queue);
323 }
324 
325 ev_future_t *
327  const struct timespec *value, struct io_tqueue_wait **pwait)
328 {
329  assert(value);
330 
331  ev_promise_t *promise = ev_promise_create(
332  sizeof(struct io_tqueue_async_wait), NULL);
333  if (!promise)
334  return NULL;
335 
336  struct io_tqueue_async_wait *async_wait = ev_promise_data(promise);
337  async_wait->promise = promise;
338  async_wait->wait = (struct io_tqueue_wait)IO_TQUEUE_WAIT_INIT(
339  value->tv_sec, value->tv_nsec, exec,
340  &io_tqueue_async_wait_func);
341 
342  io_tqueue_submit_wait(tq, &async_wait->wait);
343 
344  if (pwait)
345  *pwait = &async_wait->wait;
346 
347  return ev_promise_get_future(promise);
348 }
349 
350 struct io_tqueue_wait *
352 {
353  return task ? structof(task, struct io_tqueue_wait, task) : NULL;
354 }
355 
356 static io_ctx_t *
357 io_tqueue_dev_get_ctx(const io_dev_t *dev)
358 {
359  const io_tqueue_t *tq = io_tqueue_from_dev(dev);
360 
361  return tq->ctx;
362 }
363 
364 static ev_exec_t *
365 io_tqueue_dev_get_exec(const io_dev_t *dev)
366 {
367  const io_tqueue_t *tq = io_tqueue_from_dev(dev);
368 
369  return tq->exec;
370 }
371 
372 static size_t
373 io_tqueue_dev_cancel(io_dev_t *dev, struct ev_task *task)
374 {
375  io_tqueue_t *tq = io_tqueue_from_dev(dev);
376 
377  struct sllist queue;
378  sllist_init(&queue);
379 
380  io_tqueue_pop(tq, &queue, task);
381 
382  return io_tqueue_wait_queue_post(&queue, errnum2c(ERRNUM_CANCELED));
383 }
384 
385 static size_t
386 io_tqueue_dev_abort(io_dev_t *dev, struct ev_task *task)
387 {
388  io_tqueue_t *tq = io_tqueue_from_dev(dev);
389 
390  struct sllist queue;
391  sllist_init(&queue);
392 
393  io_tqueue_pop(tq, &queue, task);
394 
395  return ev_task_queue_abort(&queue);
396 }
397 
398 static void
399 io_tqueue_svc_shutdown(struct io_svc *svc)
400 {
401  io_tqueue_t *tq = io_tqueue_from_svc(svc);
402  io_dev_t *dev = &tq->dev_vptr;
403 
404 #if !LELY_NO_THREADS
405  mtx_lock(&tq->mtx);
406 #endif
407  int shutdown = !tq->shutdown;
408  tq->shutdown = 1;
409  // Abort io_tqueue_wait_func().
410  if (shutdown && tq->submitted
411  && io_timer_abort_wait(tq->timer, &tq->wait))
412  tq->submitted = 0;
413 #if !LELY_NO_THREADS
414  mtx_unlock(&tq->mtx);
415 #endif
416 
417  if (shutdown)
418  // Cancel all pending operations.
419  io_tqueue_dev_cancel(dev, NULL);
420 }
421 
422 static void
423 io_tqueue_wait_func(struct ev_task *task)
424 {
425  struct io_timer_wait *wait = io_timer_wait_from_task(task);
426  assert(wait);
427  io_tqueue_t *tq = structof(wait, io_tqueue_t, wait);
428 
429  int errsv = get_errc();
430  int errc = 0;
431  struct sllist queue;
432  sllist_init(&queue);
433 
434  struct timespec now = { 0, 0 };
435  if (io_clock_gettime(io_timer_get_clock(tq->timer), &now) == -1) {
436  errc = get_errc();
437 #if !LELY_NO_THREADS
438  mtx_lock(&tq->mtx);
439 #endif
440  assert(tq->submitted);
441  tq->submitted = 0;
442  tq->next = (struct timespec){ 0, 0 };
443 
444  io_tqueue_do_pop_wait(tq, &queue, NULL);
445 #if !LELY_NO_THREADS
446  mtx_unlock(&tq->mtx);
447 #endif
448  } else {
449 #if !LELY_NO_THREADS
450  mtx_lock(&tq->mtx);
451 #endif
452  assert(tq->submitted);
453  tq->submitted = 0;
454  tq->next = (struct timespec){ 0, 0 };
455 
456  struct pnode *node;
457  while ((node = pheap_first(&tq->queue))) {
458  struct io_tqueue_wait *wait = structof(
459  node, struct io_tqueue_wait, _node);
460  if (timespec_cmp(&now, &wait->value) < 0)
461  break;
462  pheap_remove(&tq->queue, node);
463  wait->task._data = NULL;
464  sllist_push_back(&queue, &wait->task._node);
465  }
466 
467  if (node) {
468  struct io_tqueue_wait *wait = structof(
469  node, struct io_tqueue_wait, _node);
470  tq->next = wait->value;
471  struct itimerspec value = { { 0, 0 }, tq->next };
472  // clang-format off
473  if (io_timer_settime(tq->timer, TIMER_ABSTIME, &value,
474  NULL) == -1) {
475  // clang-format on
476  tq->next = (struct timespec){ 0, 0 };
477  errc = get_errc();
478  io_tqueue_do_pop_wait(tq, &queue, NULL);
479  }
480  }
481 
482  int submit = tq->submitted =
483  !pheap_empty(&tq->queue) && !tq->shutdown;
484 #if !LELY_NO_THREADS
485  mtx_unlock(&tq->mtx);
486 #endif
487  if (submit)
488  io_timer_submit_wait(tq->timer, &tq->wait);
489  }
490 
491  io_tqueue_wait_queue_post(&queue, errc);
492 
493  set_errc(errsv);
494 }
495 
496 static inline io_tqueue_t *
497 io_tqueue_from_dev(const io_dev_t *dev)
498 {
499  assert(dev);
500 
501  return structof(dev, io_tqueue_t, dev_vptr);
502 }
503 
504 static inline io_tqueue_t *
505 io_tqueue_from_svc(const struct io_svc *svc)
506 {
507  assert(svc);
508 
509  return structof(svc, io_tqueue_t, svc);
510 }
511 
512 static void
513 io_tqueue_pop(io_tqueue_t *tq, struct sllist *queue, struct ev_task *task)
514 {
515  assert(tq);
516  assert(queue);
517 
518 #if !LELY_NO_THREADS
519  mtx_lock(&tq->mtx);
520 #endif
521  io_tqueue_do_pop_wait(tq, queue, io_tqueue_wait_from_task(task));
522 #if !LELY_NO_THREADS
523  mtx_unlock(&tq->mtx);
524 #endif
525 }
526 
527 static void
528 io_tqueue_do_pop_wait(io_tqueue_t *tq, struct sllist *queue,
529  struct io_tqueue_wait *wait)
530 {
531  assert(tq);
532  assert(queue);
533 
534  if (!wait) {
535  struct pnode *node;
536  while ((node = pheap_first(&tq->queue))) {
537  pheap_remove(&tq->queue, node);
538  struct io_tqueue_wait *wait = structof(
539  node, struct io_tqueue_wait, _node);
540  wait->task._data = NULL;
541  sllist_push_back(queue, &wait->task._node);
542  }
543  } else if (wait->task._data == &tq->queue) {
544  pheap_remove(&tq->queue, &wait->_node);
545  wait->task._data = NULL;
546  sllist_push_back(queue, &wait->task._node);
547  }
548 
549  if (pheap_empty(&tq->queue) && tq->submitted)
550  io_timer_cancel_wait(tq->timer, &tq->wait);
551 }
552 
553 static inline void
554 io_tqueue_wait_post(struct io_tqueue_wait *wait, int errc)
555 {
556  wait->task._data = NULL;
557  wait->errc = errc;
558 
559  ev_exec_t *exec = wait->task.exec;
560  ev_exec_post(exec, &wait->task);
561  ev_exec_on_task_fini(exec);
562 }
563 
564 static inline size_t
565 io_tqueue_wait_queue_post(struct sllist *queue, int errc)
566 {
567  size_t n = 0;
568 
569  struct slnode *node;
570  while ((node = sllist_pop_front(queue))) {
571  struct ev_task *task = ev_task_from_node(node);
572  struct io_tqueue_wait *wait = io_tqueue_wait_from_task(task);
573  io_tqueue_wait_post(wait, errc);
574  n += n < SIZE_MAX;
575  }
576 
577  return n;
578 }
579 
580 static void
581 io_tqueue_async_wait_func(struct ev_task *task)
582 {
583  assert(task);
584  struct io_tqueue_wait *wait = io_tqueue_wait_from_task(task);
585  struct io_tqueue_async_wait *async_wait =
586  structof(wait, struct io_tqueue_async_wait, wait);
587 
588  ev_promise_set(async_wait->promise, &wait->errc);
589  ev_promise_release(async_wait->promise);
590 }
void io_tqueue_submit_wait(io_tqueue_t *tq, struct io_tqueue_wait *wait)
Submits a wait operation to a timer queue.
Definition: tqueue.c:231
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:126
void pnode_init(struct pnode *node, const void *key)
Initializes a node in a paring heap.
Definition: pheap.h:196
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
void * ev_promise_data(const ev_promise_t *promise)
Returns a pointer to the shared state of a promise.
Definition: future.c:233
void ev_promise_release(ev_promise_t *promise)
Releases a reference to a promise.
Definition: future.c:195
struct timespec value
The absolute expiration time.
Definition: tqueue.h:38
This header file is part of the I/O library; it contains the I/O context and service declarations...
io_clock_t * io_timer_get_clock(const io_timer_t *timer)
Returns a pointer to the clock used by the timer.
Definition: timer.h:238
io_tqueue_t * io_tqueue_create(io_timer_t *timer, ev_exec_t *exec)
Creates a new timer queue.
Definition: tqueue.c:179
size_t io_tqueue_abort_wait(io_tqueue_t *tq, struct io_tqueue_wait *wait)
Aborts the specified timer queue wait operation if it is pending.
Definition: tqueue.c:307
size_t ev_task_queue_abort(struct sllist *queue)
Aborts the tasks in queue by invoking ev_exec_on_task_fini() for each of them.
Definition: task.c:55
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
int errnum2c(errnum_t errnum)
Transforms a platform-independent error number to a native error code.
Definition: errnum.c:825
int pheap_empty(const struct pheap *heap)
Returns 1 if the pairing heap is empty, and 0 if not.
Definition: pheap.h:222
ev_exec_t * io_dev_get_exec(const io_dev_t *dev)
Returns a pointer to the executor used by the I/O device to execute asynchronous tasks.
Definition: dev.h:86
Indicates that the requested operation succeeded.
Definition: threads.h:121
io_dev_t * io_timer_get_dev(const io_timer_t *timer)
Returns a pointer to the abstract I/O device representing the timer.
Definition: timer.h:232
A wait operation suitable for use with a timer queue.
Definition: tqueue.h:36
Definition: ctx.c:35
void ev_exec_on_task_fini(ev_exec_t *exec)
Undoes the effect of a previous call to ev_exec_on_task_init().
Definition: exec.h:114
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:138
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:121
A node in a singly-linked list.
Definition: sllist.h:39
ev_promise_t * ev_promise_create(size_t size, ev_promise_dtor_t *dtor)
Constructs a new promise with an optional empty shared state.
Definition: future.c:151
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:221
int errc
The error number, obtained as if by get_errc(), if an error occurred or the operation was canceled...
Definition: tqueue.h:48
A future.
Definition: future.c:63
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109
io_dev_t * io_tqueue_get_dev(const io_tqueue_t *tq)
Returns a pointer to the abstract I/O device representing the timer queue.
Definition: tqueue.c:215
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:957
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:213
Operation canceled.
Definition: errnum.h:96
This header file is part of the C11 and POSIX compatibility library; it includes <threads.h>, if it exists, and defines any missing functionality.
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
This header file is part of the utilities library; it contains the native and platform-independent er...
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
This header file is part of the utilities library; it contains the time function declarations.
size_t io_tqueue_cancel_wait(io_tqueue_t *tq, struct io_tqueue_wait *wait)
Cancels the specified timer queue wait operation if it is pending.
Definition: tqueue.c:288
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the wait operation.
Definition: tqueue.h:43
An I/O service.
Definition: ctx.h:49
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future...
Definition: exec.h:108
A singly-linked list.
Definition: sllist.h:51
This header file is part of the I/O library; it contains the timer queue declarations.
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function...
Definition: errnum.c:947
io_timer_t * io_tqueue_get_timer(const io_tqueue_t *tq)
Returns a pointer to the I/O timer used by the timer queue.
Definition: tqueue.c:223
ev_future_t * ev_promise_get_future(ev_promise_t *promise)
Returns (a reference to) a future associated with the specified promise.
Definition: future.c:309
void thrd_yield(void)
Endeavors to permit other threads to run, even if the current thread would ordinarily continue to run...
ev_future_t * io_tqueue_async_wait(io_tqueue_t *tq, ev_exec_t *exec, const struct timespec *value, struct io_tqueue_wait **pwait)
Submits an asynchronous wait operation to a timer queue and creates a future which becomes ready once...
Definition: tqueue.c:326
void io_timer_submit_wait(io_timer_t *timer, struct io_timer_wait *wait)
Submits a wait operation to an I/O timer.
Definition: timer.h:263
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:43
int io_timer_settime(io_timer_t *timer, int flags, const struct itimerspec *value, struct itimerspec *ovalue)
Arms or disarms an I/O timer.
Definition: timer.h:256
void pheap_insert(struct pheap *heap, struct pnode *node)
Inserts a node into a pairing heap.
Definition: pheap.c:37
void io_tqueue_destroy(io_tqueue_t *tq)
Destroys a timer queue.
Definition: tqueue.c:206
int timespec_cmp(const void *p1, const void *p2)
Compares two times.
Definition: time.h:228
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:136
static size_t io_timer_cancel_wait(io_timer_t *timer, struct io_timer_wait *wait)
Cancels the specified I/O timer wait operation if it is pending.
Definition: timer.h:269
io_ctx_t * io_dev_get_ctx(const io_dev_t *dev)
Returns a pointer to the I/O context with which the I/O device is registered.
Definition: dev.h:80
void pheap_remove(struct pheap *heap, struct pnode *node)
Removes a node from a pairing heap.
Definition: pheap.c:65
struct ev_task task
The task (to be) submitted upon completion (or cancellation) of the wait operation.
Definition: timer.h:59
An executable task.
Definition: task.h:41
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
This header file is part of the event library; it contains the abstract task executor interface...
struct io_tqueue_wait * io_tqueue_wait_from_task(struct ev_task *task)
Obtains a pointer to a timer queue wait operation from a pointer to its completion task...
Definition: tqueue.c:351
int ev_promise_set(ev_promise_t *promise, void *value)
Satiesfies a promise, if it was not aready satisfied, and stores the specified value for retrieval by...
Definition: future.c:239
The virtual table of an I/O service.
Definition: ctx.h:67
A wait operation suitable for use with an I/O timer.
Definition: timer.h:54
A pairing heap.
Definition: pheap.h:86
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node...
Definition: task.c:32
const struct io_timer_vtbl *const io_timer_t
An abstract timer.
Definition: timer.h:38
struct io_timer_wait * io_timer_wait_from_task(struct ev_task *task)
Obtains a pointer to an I/O timer wait operation from a pointer to its completion task...
Definition: timer.c:61
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib.h> and defines any missing functionality.
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:184
A promise.
Definition: future.c:92
const struct io_dev_vtbl *const io_dev_t
An abstract I/O device.
Definition: dev.h:35
int io_clock_gettime(const io_clock_t *clock, struct timespec *tp)
Obtains the current time value of the specified clock.
Definition: clock.h:96
struct pnode * pheap_first(const struct pheap *heap)
Returns a pointer to the first (minimum) node in a pairing heap.
Definition: pheap.h:234
void pheap_init(struct pheap *heap, pheap_cmp_t *cmp)
Initializes a pairing heap.
Definition: pheap.h:214
This is the internal header file of the I/O library.
This is the public header file of the utilities library.
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values: ...
A node in a pairing heap.
Definition: pheap.h:51
static size_t io_timer_abort_wait(io_timer_t *timer, struct io_timer_wait *wait)
Aborts the specified I/O timer wait operation if it is pending.
Definition: timer.h:275