Lely core libraries 2.3.4
poll.c
Go to the documentation of this file.
1
24#include "io.h"
25
26#if !LELY_NO_STDIO && _POSIX_C_SOURCE >= 200112L
27
28#include <lely/io2/posix/poll.h>
29#include <lely/util/dllist.h>
30#include <lely/util/util.h>
31
32#include <assert.h>
33#include <errno.h>
34#include <limits.h>
35#include <signal.h>
36#include <stdlib.h>
37
38#if !LELY_NO_THREADS
39#include <pthread.h>
40#endif
41#include <unistd.h>
42
43#ifndef LELY_HAVE_PPOLL
44// TODO: ppoll() is also supported by DragonFly 4.6 and OpenBSD 5.4. On
45// NetBSD 3.0 this function is called pollts().
46#if defined(__linux__) || (__FreeBSD__ >= 11)
47#define LELY_HAVE_PPOLL 1
48#endif
49#endif
50
51#if LELY_HAVE_PPOLL
52#include <poll.h>
53#else
54#include <sys/select.h>
55#endif
56
57#if LELY_HAVE_PPOLL
58#ifndef LELY_IO_PPOLL_NFDS
59#define LELY_IO_PPOLL_NFDS MAX((LELY_VLA_SIZE_MAX / sizeof(struct pollfd)), 1)
60#endif
61#endif
62
63struct io_poll_thrd {
64 int stopped;
65#if !LELY_NO_THREADS
66 pthread_t *thread;
67 struct dlnode node;
68#endif
69};
70
71// clang-format off
72static const struct io_svc_vtbl io_poll_svc_vtbl = {
73 NULL,
74 NULL
75};
76// clang-format on
77
78static void *io_poll_poll_self(const ev_poll_t *poll);
79static int io_poll_poll_wait(ev_poll_t *poll, int timeout);
80static int io_poll_poll_kill(ev_poll_t *poll, void *thr);
81
82// clang-format off
83static const struct ev_poll_vtbl io_poll_poll_vtbl = {
84 &io_poll_poll_self,
85 &io_poll_poll_wait,
86 &io_poll_poll_kill
87};
88// clang-format on
89
90struct io_poll {
91 struct io_svc svc;
92 const struct ev_poll_vtbl *poll_vptr;
93 io_ctx_t *ctx;
94 int signo;
95 struct sigaction oact;
96 sigset_t oset;
97#if !LELY_NO_THREADS
98 pthread_mutex_t mtx;
99#endif
100 struct rbtree tree;
101#if !LELY_NO_THREADS
102 struct dllist threads;
103#endif
104};
105
106static inline io_poll_t *io_poll_from_svc(const struct io_svc *svc);
107static inline io_poll_t *io_poll_from_poll(const ev_poll_t *poll);
108
109static void io_poll_process(
110 io_poll_t *poll, int revents, struct io_poll_watch *watch);
111
112static int io_fd_cmp(const void *p1, const void *p2);
113
114static inline struct io_poll_watch *io_poll_watch_from_node(
115 struct rbnode *node);
116
117static void sig_ign(int signo);
118
119void *
120io_poll_alloc(void)
121{
122 return malloc(sizeof(io_poll_t));
123}
124
125void
126io_poll_free(void *ptr)
127{
128 free(ptr);
129}
130
131io_poll_t *
132io_poll_init(io_poll_t *poll, io_ctx_t *ctx, int signo)
133{
134 assert(poll);
135 assert(ctx);
136
137 if (!signo)
138 signo = SIGUSR1;
139
140 int errsv = 0;
141
142 poll->svc = (struct io_svc)IO_SVC_INIT(&io_poll_svc_vtbl);
143 poll->ctx = ctx;
144
145 poll->poll_vptr = &io_poll_poll_vtbl;
146
147 poll->signo = signo;
148
149 struct sigaction act;
150 act.sa_handler = &sig_ign;
151 sigemptyset(&act.sa_mask);
152 act.sa_flags = 0;
153 if (sigaction(poll->signo, &act, &poll->oact) == -1) {
154 errsv = errno;
155 goto error_sigaction;
156 }
157
158 // Block the wake up signal so it is only delivered during polling.
159 sigset_t set;
160 sigemptyset(&set);
161 sigaddset(&set, poll->signo);
162#if LELY_NO_THREADS
163 if (sigprocmask(SIG_BLOCK, &set, &poll->oset) == -1) {
164 errsv = errno;
165#else
166 if ((errsv = pthread_sigmask(SIG_BLOCK, &set, &poll->oset))) {
167#endif
168 goto error_sigmask;
169 }
170
171#if !LELY_NO_THREADS
172 if ((errsv = pthread_mutex_init(&poll->mtx, NULL)))
173 goto error_init_mtx;
174#endif
175
176 rbtree_init(&poll->tree, &io_fd_cmp);
177
178#if !LELY_NO_THREADS
179 dllist_init(&poll->threads);
180#endif
181
182 io_ctx_insert(poll->ctx, &poll->svc);
183
184 return poll;
185
186#if !LELY_NO_THREADS
187 // pthread_mutex_destroy(&poll->mtx);
188error_init_mtx:
189#endif
190#if LELY_NO_THREADS
191 sigprocmask(SIG_SETMASK, &poll->oset, NULL);
192#else
193 pthread_sigmask(SIG_SETMASK, &poll->oset, NULL);
194#endif
195error_sigmask:
196 sigaction(poll->signo, &poll->oact, NULL);
197error_sigaction:
198 errno = errsv;
199 return NULL;
200}
201
202void
203io_poll_fini(io_poll_t *poll)
204{
205 assert(poll);
206
207 io_ctx_remove(poll->ctx, &poll->svc);
208
209#if !LELY_NO_THREADS
210 assert(dllist_empty(&poll->threads));
211#endif
212
213#if !LELY_NO_THREADS
214 pthread_mutex_destroy(&poll->mtx);
215#endif
216
217 // Clear any pending (and currently blocked) wake up signal.
218 sigset_t set;
219 sigemptyset(&set);
220 sigaddset(&set, poll->signo);
221 int errsv = errno;
222 while (sigtimedwait(&set, &(siginfo_t){ 0 }, &(struct timespec){ 0, 0 })
223 == poll->signo)
224 ;
225 errno = errsv;
226 // Unblock the wake up signal and restore the original signal handler.
227#if LELY_NO_THREADS
228 sigprocmask(SIG_SETMASK, &poll->oset, NULL);
229#else
230 pthread_sigmask(SIG_SETMASK, &poll->oset, NULL);
231#endif
232 sigaction(poll->signo, &poll->oact, NULL);
233}
234
235io_poll_t *
236io_poll_create(io_ctx_t *ctx, int signo)
237{
238 int errsv = 0;
239
240 io_poll_t *poll = io_poll_alloc();
241 if (!poll) {
242 errsv = errno;
243 goto error_alloc;
244 }
245
246 io_poll_t *tmp = io_poll_init(poll, ctx, signo);
247 if (!tmp) {
248 errsv = errno;
249 goto error_init;
250 }
251 poll = tmp;
252
253 return poll;
254
255error_init:
256 io_poll_free(poll);
257error_alloc:
258 errno = errsv;
259 return NULL;
260}
261
262void
264{
265 if (poll) {
266 io_poll_fini(poll);
267 io_poll_free(poll);
268 }
269}
270
271io_ctx_t *
273{
274 return poll->ctx;
275}
276
277ev_poll_t *
279{
280 return &poll->poll_vptr;
281}
282
283int
284io_poll_watch(io_poll_t *poll, int fd, int events, struct io_poll_watch *watch)
285{
286 assert(poll);
287 assert(watch);
288
289 if (fd == -1) {
290 errno = EBADF;
291 return -1;
292 }
293
294 if (events < 0) {
295 errno = EINVAL;
296 return -1;
297 }
298 events &= IO_EVENT_MASK;
299
300 int result = -1;
301 int errsv = errno;
302#if !LELY_NO_THREADS
303 pthread_mutex_lock(&poll->mtx);
304#endif
305
306 struct rbnode *node = rbtree_find(&poll->tree, &fd);
307 if (node) {
308 if (node != &watch->_node) {
309 errsv = EEXIST;
310 goto error;
311 }
312 if (!events)
313 rbtree_remove(&poll->tree, node);
314 } else if (events) {
315#if LELY_HAVE_PPOLL
316 if (rbtree_size(&poll->tree) >= (nfds_t)~0) {
317#else
318 if (rbtree_size(&poll->tree) >= FD_SETSIZE) {
319#endif
320 errsv = EINVAL;
321 goto error;
322 }
323 watch->_fd = fd;
324 rbnode_init(&watch->_node, &watch->_fd);
325 rbtree_insert(&poll->tree, &watch->_node);
326 }
327
328 if (events != watch->_events) {
329 watch->_events = events;
330#if !LELY_NO_THREADS
331 // Wake up all polling threads.
332 dllist_foreach (&poll->threads, node) {
333 struct io_poll_thrd *thr = structof(
334 node, struct io_poll_thrd, node);
335 if (!thr->stopped) {
336 thr->stopped = 1;
337 pthread_kill(*thr->thread, poll->signo);
338 }
339 }
340#endif
341 }
342
343 result = 0;
344
345error:
346#if !LELY_NO_THREADS
347 pthread_mutex_unlock(&poll->mtx);
348#endif
349 errno = errsv;
350 return result;
351}
352
353static void *
354io_poll_poll_self(const ev_poll_t *poll)
355{
356 (void)poll;
357
358#if LELY_NO_THREADS
359 static struct io_poll_thrd thr = { 0 };
360#else
361 static _Thread_local struct io_poll_thrd thr = { 0, NULL, DLNODE_INIT };
362 if (!thr.thread) {
363 static _Thread_local pthread_t thread;
364 thread = pthread_self();
365 thr.thread = &thread;
366 }
367#endif
368 return &thr;
369}
370
371static int
372io_poll_poll_wait(ev_poll_t *poll_, int timeout_)
373{
374 io_poll_t *poll = io_poll_from_poll(poll_);
375 void *thr_ = io_poll_poll_self(poll_);
376 struct io_poll_thrd *thr = (struct io_poll_thrd *)thr_;
377
378 struct timespec tv = { 0, 0 };
379 if (timeout_ > 0) {
380 tv.tv_sec = timeout_ / 1000;
381 tv.tv_nsec = (timeout_ % 1000) * 1000000l;
382 }
383 struct timespec *timeout = timeout_ >= 0 ? &tv : NULL;
384
385 int n = 0;
386 int errsv = errno;
387
388 sigset_t set;
389 sigemptyset(&set);
390
391#if LELY_HAVE_PPOLL
392 struct pollfd fds_[LELY_IO_PPOLL_NFDS];
393 struct pollfd *fds = fds_;
394#else
395 fd_set rfds, wfds, efds;
396#endif
397
398#if !LELY_NO_THREADS
399 pthread_mutex_lock(&poll->mtx);
400 dllist_push_back(&poll->threads, &thr->node);
401#endif
402 if (!timeout_)
403 thr->stopped = 1;
404
405 int stopped = 0;
406 do {
407 if (thr->stopped) {
408 if (rbtree_empty(&poll->tree))
409 break;
410 timeout = &tv;
411 tv = (struct timespec){ 0, 0 };
412 }
413
414#if LELY_HAVE_PPOLL
415 assert(rbtree_size(&poll->tree) <= (nfds_t)~0);
416 if (fds != fds_)
417 free(fds);
418 fds = fds_;
419 nfds_t nfds = rbtree_size(&poll->tree);
420 if (nfds > LELY_IO_PPOLL_NFDS) {
421 fds = malloc(nfds * sizeof(*fds));
422 if (!fds) {
423 errsv = errno;
424 n = -1;
425 break;
426 }
427 }
428
429 nfds_t i = 0;
430 rbtree_foreach (&poll->tree, node) {
431 struct io_poll_watch *watch =
432 io_poll_watch_from_node(node);
433 int events = 0;
434 if (watch->_events & IO_EVENT_IN)
435 events |= POLLIN;
436 if (watch->_events & IO_EVENT_PRI)
437 events |= POLLRDBAND | POLLPRI;
438 if (watch->_events & IO_EVENT_OUT)
439 events |= POLLOUT;
440 fds[i++] = (struct pollfd){ .fd = watch->_fd,
441 .events = events };
442 }
443 assert(i == nfds);
444#else
445 assert(rbtree_size(&poll->tree) <= FD_SETSIZE);
446 FD_ZERO(&rfds);
447 int nrfds = 0;
448 FD_ZERO(&wfds);
449 int nwfds = 0;
450 FD_ZERO(&efds);
451 int nefds = 0;
452
453 rbtree_foreach (&poll->tree, node) {
454 struct io_poll_watch *watch =
455 io_poll_watch_from_node(node);
456 if (watch->_events & IO_EVENT_IN) {
457 FD_SET(watch->_fd, &rfds);
458 nrfds = MAX(nrfds, watch->_fd + 1);
459 }
460 if (watch->_events & IO_EVENT_OUT) {
461 FD_SET(watch->_fd, &wfds);
462 nwfds = MAX(nwfds, watch->_fd + 1);
463 }
464 FD_SET(watch->_fd, &efds);
465 nefds = MAX(nefds, watch->_fd + 1);
466 }
467#endif
468
469#if !LELY_NO_THREADS
470 pthread_mutex_unlock(&poll->mtx);
471#endif
472 errno = 0;
473#if LELY_HAVE_PPOLL
474 int result = ppoll(fds, nfds, timeout, &set);
475#else
476 int result = pselect(nefds, nrfds ? &rfds : NULL,
477 nwfds ? &wfds : NULL, nefds ? &efds : NULL,
478 timeout, &set);
479#endif
480#if !LELY_NO_THREADS
481 pthread_mutex_lock(&poll->mtx);
482#endif
483 if (result == -1) {
484 if (errno == EINTR) {
485 // If the wait is interrupted by a signal, we
486 // don't receive any events. This can result in
487 // starvation if too many signals are generated
488 // (e.g., when too many tasks are queued). To
489 // prevent starvation, we poll for events again,
490 // but this time with the interrupt signal
491 // blocked (and timeout 0).
492 sigaddset(&set, poll->signo);
493 thr->stopped = 1;
494 continue;
495 } else {
496 errsv = errno;
497 n = -1;
498 break;
499 }
500 }
501
502#if LELY_HAVE_PPOLL
503 for (i = 0; i < nfds; i++) {
504 if (!fds[i].revents || (fds[i].revents & POLLNVAL))
505 continue;
506 struct rbnode *node =
507 rbtree_find(&poll->tree, &fds[i].fd);
508 if (!node)
509 continue;
510 struct io_poll_watch *watch =
511 io_poll_watch_from_node(node);
512 int revents = 0;
513 if (fds[i].revents & POLLIN)
514 revents |= IO_EVENT_IN;
515 if (fds[i].revents & (POLLRDBAND | POLLPRI))
516 revents |= IO_EVENT_PRI;
517 if (fds[i].revents & POLLOUT)
518 revents |= IO_EVENT_OUT;
519 if (fds[i].revents & POLLERR)
520 revents |= IO_EVENT_ERR;
521 if (fds[i].revents & POLLHUP)
522 revents |= IO_EVENT_HUP;
523 io_poll_process(poll, revents, watch);
524 n += n < INT_MAX;
525 }
526#else
527 rbtree_foreach (&poll->tree, node) {
528 struct io_poll_watch *watch =
529 io_poll_watch_from_node(node);
530 if (!result)
531 break;
532 int revents = 0;
533 if (FD_ISSET(watch->_fd, &rfds))
534 revents |= IO_EVENT_IN;
535 if (FD_ISSET(watch->_fd, &wfds))
536 revents |= IO_EVENT_OUT;
537 if (FD_ISSET(watch->_fd, &efds)) {
538 if (watch->_events & IO_EVENT_PRI)
539 revents |= IO_EVENT_PRI;
540 revents |= IO_EVENT_ERR;
541 }
542 if (revents) {
543 result--;
544 io_poll_process(poll, revents, watch);
545 n += n < INT_MAX;
546 }
547 }
548#endif
549
550 stopped = thr->stopped = 1;
551 } while (!stopped);
552#if !LELY_NO_THREADS
553 dllist_remove(&poll->threads, &thr->node);
554#endif
555 thr->stopped = 0;
556#if !LELY_NO_THREADS
557 pthread_mutex_unlock(&poll->mtx);
558#endif
559
560#if LELY_HAVE_PPOLL
561 if (fds != fds_)
562 free(fds);
563#endif
564
565 errno = errsv;
566 return n;
567}
568
569static int
570io_poll_poll_kill(ev_poll_t *poll_, void *thr_)
571{
572#if !LELY_NO_THREADS
573 io_poll_t *poll = io_poll_from_poll(poll_);
574#endif
575 struct io_poll_thrd *thr = thr_;
576 assert(thr);
577
578 if (thr_ == io_poll_poll_self(poll_))
579 return 0;
580
581#if !LELY_NO_THREADS
582 pthread_mutex_lock(&poll->mtx);
583#endif
584 int stopped = thr->stopped;
585 if (!stopped)
586 thr->stopped = 1;
587#if !LELY_NO_THREADS
588 pthread_mutex_unlock(&poll->mtx);
589 if (!stopped) {
590 int errsv = pthread_kill(*thr->thread, poll->signo);
591 if (errsv) {
592 errno = errsv;
593 return -1;
594 }
595 }
596#endif
597 return 0;
598}
599
600static inline io_poll_t *
601io_poll_from_svc(const struct io_svc *svc)
602{
603 assert(svc);
604
605 return structof(svc, io_poll_t, svc);
606}
607
608static inline io_poll_t *
609io_poll_from_poll(const ev_poll_t *poll)
610{
611 assert(poll);
612
613 return structof(poll, io_poll_t, poll_vptr);
614}
615
616static void
617io_poll_process(io_poll_t *poll, int revents, struct io_poll_watch *watch)
618{
619 assert(poll);
620 assert(watch);
621
622 rbtree_remove(&poll->tree, &watch->_node);
623 watch->_events = 0;
624 if (watch->func) {
625#if !LELY_NO_THREADS
626 pthread_mutex_unlock(&poll->mtx);
627#endif
628 watch->func(watch, revents);
629#if !LELY_NO_THREADS
630 pthread_mutex_lock(&poll->mtx);
631#endif
632 }
633}
634
635static int
636io_fd_cmp(const void *p1, const void *p2)
637{
638 assert(p1);
639 int fd1 = *(const int *)p1;
640 assert(p2);
641 int fd2 = *(const int *)p2;
642
643 return (fd2 < fd1) - (fd1 < fd2);
644}
645
646static inline struct io_poll_watch *
647io_poll_watch_from_node(struct rbnode *node)
648{
649 assert(node);
650
651 return structof(node, struct io_poll_watch, _node);
652}
653
654static void
655sig_ign(int signo)
656{
657 (void)signo;
658}
659
660#endif // !LELY_NO_STDIO && _POSIX_C_SOURCE >= 200112L
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition ctx.c:126
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition ctx.c:141
This header file is part of the utilities library; it contains the doubly-linked list declarations.
void dllist_push_back(struct dllist *list, struct dlnode *node)
Pushes a node to the back of a doubly-linked list.
Definition dllist.h:323
void dllist_init(struct dllist *list)
Initializes a doubly-linked list.
Definition dllist.h:281
int dllist_empty(const struct dllist *list)
Returns 1 if the doubly-linked list is empty, and 0 if not.
Definition dllist.h:290
#define dllist_foreach(list, node)
Iterates in order over each node in a doubly-linked list.
Definition dllist.h:232
#define DLNODE_INIT
The static initializer for dlnode.
Definition dllist.h:48
void dllist_remove(struct dllist *list, struct dlnode *node)
Removes a node from a doubly-linked list.
Definition dllist.h:387
const struct ev_poll_vtbl *const ev_poll_t
The abstract polling interface.
Definition poll.h:32
@ IO_EVENT_HUP
The device has been disconnected.
Definition event.h:56
@ IO_EVENT_IN
Data (other than priority data) MAY be read without blocking.
Definition event.h:35
@ IO_EVENT_OUT
Data (bot normal and priority data) MAY be written without blocking.
Definition event.h:46
@ IO_EVENT_ERR
An error has occurred. This event is always reported.
Definition event.h:48
@ IO_EVENT_PRI
Priority data MAY be read without blocking.
Definition event.h:40
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition features.h:249
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition util.h:93
#define MAX(a, b)
Returns the maximum of a and b.
Definition util.h:65
ev_poll_t * io_poll_get_poll(const io_poll_t *poll)
Returns a pointer to the ev_poll_t instance corresponding to the I/O polling instance.
Definition poll.c:281
io_ctx_t * io_poll_get_ctx(const io_poll_t *poll)
Returns a pointer to the I/O context with which the I/O polling instance is registered.
Definition poll.c:275
This header file is part of the I/O library; it contains the I/O polling declarations for POSIX platf...
This header file is part of the I/O library; it contains the I/O polling declarations for Windows.
void io_poll_destroy(io_poll_t *poll)
Destroys an I/O polling interface.
Definition poll.c:243
io_poll_t * io_poll_create(void)
Creates a new I/O polling interface.
Definition poll.c:218
void rbnode_init(struct rbnode *node, const void *key)
Initializes a node in a red-black tree.
Definition rbtree.h:237
void rbtree_insert(struct rbtree *tree, struct rbnode *node)
Inserts a node into a red-black tree.
Definition rbtree.c:108
struct rbnode * rbtree_find(const struct rbtree *tree, const void *key)
Finds a node in a red-black tree.
Definition rbtree.c:306
void rbtree_init(struct rbtree *tree, rbtree_cmp_t *cmp)
Initializes a red-black tree.
Definition rbtree.h:248
size_t rbtree_size(const struct rbtree *tree)
Returns the size (in number of nodes) of a red-black tree.
Definition rbtree.h:265
void rbtree_remove(struct rbtree *tree, struct rbnode *node)
Removes a node from a red-black tree.
Definition rbtree.c:187
int rbtree_empty(const struct rbtree *tree)
Returns 1 if the red-black tree is empty, and 0 if not.
Definition rbtree.h:259
#define rbtree_foreach(tree, node)
Iterates over each node in a red-black tree in ascending order.
Definition rbtree.h:234
This is the internal header file of the Windows-specific I/O declarations.
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
An I/O polling interface.
Definition poll.c:51
struct rbtree tree
The tree containing the I/O device handles being watched.
Definition poll.c:57
mtx_t mtx
The mutex protecting tree.
Definition poll.c:54
A doubly-linked list.
Definition dllist.h:54
A node in a doubly-linked list.
Definition dllist.h:40
Definition ctx.c:38
An object representing a file descriptor being monitored for I/O events.
Definition poll.h:56
io_poll_watch_func_t * func
A pointer to the function to be invoked when an I/O event occurs.
Definition poll.h:58
Definition poll.c:88
The virtual table of an I/O service.
Definition ctx.h:67
An I/O service.
Definition ctx.h:49
A node in a red-black tree.
Definition rbtree.h:53
A red-black tree.
Definition rbtree.h:91
A time type with nanosecond resolution.
Definition time.h:88
long tv_nsec
Nanoseconds [0, 999999999].
Definition time.h:92
time_t tv_sec
Whole seconds (>= 0).
Definition time.h:90
This header file is part of the C11 and POSIX compatibility library; it includes <unistd....