Lely core libraries 2.3.4
poll.c
Go to the documentation of this file.
1
24#include "io.h"
25
26#if !LELY_NO_STDIO && defined(__linux__)
27
28#include <lely/io2/posix/poll.h>
29#include <lely/util/util.h>
30
31#include <assert.h>
32#include <errno.h>
33#include <limits.h>
34#include <signal.h>
35#include <stdlib.h>
36
37#if !LELY_NO_THREADS
38#include <pthread.h>
39#endif
40#include <unistd.h>
41
42#include <sys/epoll.h>
43
44// clang-format off
45#define EPOLL_EVENT_INIT(events, fd) \
46 { \
47 (((events) & IO_EVENT_IN) ? (EPOLLIN | EPOLLRDHUP) : 0) \
48 | (((events) & IO_EVENT_PRI) ? EPOLLPRI : 0) \
49 | (((events) & IO_EVENT_OUT) ? EPOLLOUT : 0) \
50 | EPOLLONESHOT, \
51 { .fd = (fd) } \
52 }
53// clang-format on
54
55#ifndef LELY_IO_EPOLL_MAXEVENTS
56#define LELY_IO_EPOLL_MAXEVENTS \
57 MAX((LELY_VLA_SIZE_MAX / sizeof(struct epoll_event)), 1)
58#endif
59
61 int stopped;
62#if !LELY_NO_THREADS
63 pthread_t *thread;
64#endif
65};
66
67static int io_poll_svc_notify_fork(struct io_svc *svc, enum io_fork_event e);
68
69// clang-format off
70static const struct io_svc_vtbl io_poll_svc_vtbl = {
71 &io_poll_svc_notify_fork,
72 NULL
73};
74// clang-format on
75
76static void *io_poll_poll_self(const ev_poll_t *poll);
77static int io_poll_poll_wait(ev_poll_t *poll, int timeout);
78static int io_poll_poll_kill(ev_poll_t *poll, void *thr);
79
80// clang-format off
81static const struct ev_poll_vtbl io_poll_poll_vtbl = {
82 &io_poll_poll_self,
83 &io_poll_poll_wait,
84 &io_poll_poll_kill
85};
86// clang-format on
87
88struct io_poll {
89 struct io_svc svc;
90 const struct ev_poll_vtbl *poll_vptr;
91 io_ctx_t *ctx;
92 int signo;
93 struct sigaction oact;
94 sigset_t oset;
95 int epfd;
96#if !LELY_NO_THREADS
97 pthread_mutex_t mtx;
98#endif
99 struct rbtree tree;
100 size_t nwatch;
101};
102
103static inline io_poll_t *io_poll_from_svc(const struct io_svc *svc);
104static inline io_poll_t *io_poll_from_poll(const ev_poll_t *poll);
105
106static int io_poll_open(io_poll_t *poll);
107static int io_poll_close(io_poll_t *poll);
108
109static void io_poll_process(
110 io_poll_t *poll, int revents, struct io_poll_watch *watch);
111
112static int io_fd_cmp(const void *p1, const void *p2);
113
114static inline struct io_poll_watch *io_poll_watch_from_node(
115 struct rbnode *node);
116
117static void sig_ign(int signo);
118
119void *
120io_poll_alloc(void)
121{
122 return malloc(sizeof(io_poll_t));
123}
124
125void
126io_poll_free(void *ptr)
127{
128 free(ptr);
129}
130
131io_poll_t *
132io_poll_init(io_poll_t *poll, io_ctx_t *ctx, int signo)
133{
134 assert(poll);
135 assert(ctx);
136
137 if (!signo)
138 signo = SIGUSR1;
139
140 int errsv = 0;
141
142 poll->svc = (struct io_svc)IO_SVC_INIT(&io_poll_svc_vtbl);
143 poll->ctx = ctx;
144
145 poll->poll_vptr = &io_poll_poll_vtbl;
146
147 poll->signo = signo;
148
149 struct sigaction act;
150 act.sa_handler = &sig_ign;
151 sigemptyset(&act.sa_mask);
152 act.sa_flags = 0;
153 if (sigaction(poll->signo, &act, &poll->oact) == -1) {
154 errsv = errno;
155 goto error_sigaction;
156 }
157
158 // Block the wake up signal so it is only delivered during polling.
159 sigset_t set;
160 sigemptyset(&set);
161 sigaddset(&set, poll->signo);
162#if LELY_NO_THREADS
163 if (sigprocmask(SIG_BLOCK, &set, &poll->oset) == -1) {
164 errsv = errno;
165#else
166 if ((errsv = pthread_sigmask(SIG_BLOCK, &set, &poll->oset))) {
167#endif
168 goto error_sigmask;
169 }
170
171 poll->epfd = -1;
172#if !LELY_NO_THREADS
173 if ((errsv = pthread_mutex_init(&poll->mtx, NULL)))
174 goto error_init_mtx;
175#endif
176
177 rbtree_init(&poll->tree, &io_fd_cmp);
178 poll->nwatch = 0;
179
180 if (io_poll_open(poll) == -1) {
181 errsv = errno;
182 goto error_open;
183 }
184
185 io_ctx_insert(poll->ctx, &poll->svc);
186
187 return poll;
188
189 // io_poll_close(poll);
190error_open:
191#if !LELY_NO_THREADS
192 pthread_mutex_destroy(&poll->mtx);
193error_init_mtx:
194#endif
195#if LELY_NO_THREADS
196 sigprocmask(SIG_SETMASK, &poll->oset, NULL);
197#else
198 pthread_sigmask(SIG_SETMASK, &poll->oset, NULL);
199#endif
200error_sigmask:
201 sigaction(poll->signo, &poll->oact, NULL);
202error_sigaction:
203 errno = errsv;
204 return NULL;
205}
206
207void
208io_poll_fini(io_poll_t *poll)
209{
210 assert(poll);
211
212 io_ctx_remove(poll->ctx, &poll->svc);
213
214 io_poll_close(poll);
215
216#if !LELY_NO_THREADS
217 pthread_mutex_destroy(&poll->mtx);
218#endif
219
220 // Clear any pending (and currently blocked) wake up signal.
221 sigset_t set;
222 sigemptyset(&set);
223 sigaddset(&set, poll->signo);
224 int errsv = errno;
225 while (sigtimedwait(&set, &(siginfo_t){ 0 }, &(struct timespec){ 0, 0 })
226 == poll->signo)
227 ;
228 errno = errsv;
229 // Unblock the wake up signal and restore the original signal handler.
230#if LELY_NO_THREADS
231 sigprocmask(SIG_SETMASK, &poll->oset, NULL);
232#else
233 pthread_sigmask(SIG_SETMASK, &poll->oset, NULL);
234#endif
235 sigaction(poll->signo, &poll->oact, NULL);
236}
237
238io_poll_t *
239io_poll_create(io_ctx_t *ctx, int signo)
240{
241 int errsv = 0;
242
243 io_poll_t *poll = io_poll_alloc();
244 if (!poll) {
245 errsv = errno;
246 goto error_alloc;
247 }
248
249 io_poll_t *tmp = io_poll_init(poll, ctx, signo);
250 if (!tmp) {
251 errsv = errno;
252 goto error_init;
253 }
254 poll = tmp;
255
256 return poll;
257
258error_init:
259 io_poll_free(poll);
260error_alloc:
261 errno = errsv;
262 return NULL;
263}
264
265void
267{
268 if (poll) {
269 io_poll_fini(poll);
270 io_poll_free(poll);
271 }
272}
273
274io_ctx_t *
276{
277 return poll->ctx;
278}
279
280ev_poll_t *
282{
283 return &poll->poll_vptr;
284}
285
286int
287io_poll_watch(io_poll_t *poll, int fd, int events, struct io_poll_watch *watch)
288{
289 assert(poll);
290 assert(watch);
291 int epfd = poll->epfd;
292
293 if (fd == -1 || fd == epfd) {
294 errno = EBADF;
295 return -1;
296 }
297
298 if (events < 0) {
299 errno = EINVAL;
300 return -1;
301 }
302 events &= IO_EVENT_MASK;
303
304 int result = -1;
305 int errsv = errno;
306#if !LELY_NO_THREADS
307 pthread_mutex_lock(&poll->mtx);
308#endif
309
310 struct rbnode *node = rbtree_find(&poll->tree, &fd);
311 if (node && node != &watch->_node) {
312 errsv = EEXIST;
313 goto error;
314 }
315
316 if (events) {
317 struct epoll_event event = EPOLL_EVENT_INIT(events, fd);
318 if (node && events != watch->_events) {
319 if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &event) == -1) {
320 errsv = errno;
321 epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
322 rbtree_remove(&poll->tree, node);
323 if (watch->_events)
324 poll->nwatch--;
325 watch->_events = 0;
326 goto error;
327 }
328 } else if (!node) {
329 if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) == -1) {
330 errsv = errno;
331 goto error;
332 }
333 watch->_fd = fd;
334 rbnode_init(&watch->_node, &watch->_fd);
335 watch->_events = 0;
336 rbtree_insert(&poll->tree, &watch->_node);
337 }
338 if (!watch->_events)
339 poll->nwatch++;
340 watch->_events = events;
341 } else if (node) {
342 epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
343 rbtree_remove(&poll->tree, node);
344 if (watch->_events)
345 poll->nwatch--;
346 watch->_events = 0;
347 }
348
349 result = 0;
350
351error:
352#if !LELY_NO_THREADS
353 pthread_mutex_unlock(&poll->mtx);
354#endif
355 errno = errsv;
356 return result;
357}
358
359static int
360io_poll_svc_notify_fork(struct io_svc *svc, enum io_fork_event e)
361{
362 io_poll_t *poll = io_poll_from_svc(svc);
363
364 if (e != IO_FORK_CHILD)
365 return 0;
366
367 int result = 0;
368 int errsv = errno;
369
370 if (io_poll_close(poll) == -1) {
371 errsv = errno;
372 result = -1;
373 }
374
375 if (io_poll_open(poll) == -1 && !result) {
376 errsv = errno;
377 result = -1;
378 }
379
380 int epfd = poll->epfd;
381 rbtree_foreach (&poll->tree, node) {
382 struct io_poll_watch *watch = io_poll_watch_from_node(node);
383 int fd = watch->_fd;
384 int events = watch->_events;
385 if (events) {
386 struct epoll_event event = EPOLL_EVENT_INIT(events, fd);
387
388 if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) == -1) {
389 if (!result) {
390 errsv = errno;
391 result = -1;
392 }
393 rbtree_remove(&poll->tree, node);
394 }
395 } else {
396 rbtree_remove(&poll->tree, node);
397 }
398 }
399
400 errno = errsv;
401 return result;
402}
403
404static void *
405io_poll_poll_self(const ev_poll_t *poll)
406{
407 (void)poll;
408
409#if LELY_NO_THREADS
410 static struct io_poll_thrd thr = { 0 };
411#else
412 static _Thread_local struct io_poll_thrd thr = { 0, NULL };
413 if (!thr.thread) {
414 static _Thread_local pthread_t thread;
415 thread = pthread_self();
416 thr.thread = &thread;
417 }
418#endif
419 return &thr;
420}
421
422static int
423io_poll_poll_wait(ev_poll_t *poll_, int timeout)
424{
425 io_poll_t *poll = io_poll_from_poll(poll_);
426 void *thr_ = io_poll_poll_self(poll_);
427 struct io_poll_thrd *thr = (struct io_poll_thrd *)thr_;
428
429 int n = 0;
430 int errsv = errno;
431
432 sigset_t set;
433 sigemptyset(&set);
434
435 struct epoll_event events[LELY_IO_EPOLL_MAXEVENTS];
436
437#if !LELY_NO_THREADS
438 pthread_mutex_lock(&poll->mtx);
439#endif
440 if (!timeout)
441 thr->stopped = 1;
442
443 int stopped = 0;
444 do {
445 if (thr->stopped) {
446 if (!poll->nwatch)
447 break;
448 timeout = 0;
449 }
450
451#if !LELY_NO_THREADS
452 pthread_mutex_unlock(&poll->mtx);
453#endif
454 errno = 0;
455 int nevents = epoll_pwait(poll->epfd, events,
456 LELY_IO_EPOLL_MAXEVENTS, timeout, &set);
457 if (nevents == -1 && errno == EINTR) {
458 // If the wait is interrupted by a signal, we don't
459 // receive any events. This can result in starvation if
460 // too many signals are generated (e.g., when too many
461 // tasks are queued). To prevent starvation, we poll for
462 // events again, but this time with the interrupt signal
463 // blocked (and timeout 0).
464 sigaddset(&set, poll->signo);
465#if !LELY_NO_THREADS
466 pthread_mutex_lock(&poll->mtx);
467#endif
468 thr->stopped = 1;
469 continue;
470 }
471 if (nevents == -1) {
472 if (!n) {
473 errsv = errno;
474 n = -1;
475 }
476#if !LELY_NO_THREADS
477 pthread_mutex_lock(&poll->mtx);
478#endif
479 break;
480 }
481
482 for (int i = 0; i < nevents; i++) {
483 int revents = 0;
484 if (events[i].events & (EPOLLIN | EPOLLRDHUP))
485 revents |= IO_EVENT_IN;
486 if (events[i].events & EPOLLPRI)
487 revents |= IO_EVENT_PRI;
488 if (events[i].events & EPOLLOUT)
489 revents |= IO_EVENT_OUT;
490 if (events[i].events & EPOLLERR)
491 revents |= IO_EVENT_ERR;
492 if (events[i].events & EPOLLHUP)
493 revents |= IO_EVENT_HUP;
494 int fd = events[i].data.fd;
495#if !LELY_NO_THREADS
496 pthread_mutex_lock(&poll->mtx);
497#endif
498 struct rbnode *node = rbtree_find(&poll->tree, &fd);
499 if (node) {
500 struct io_poll_watch *watch =
501 io_poll_watch_from_node(node);
502 io_poll_process(poll, revents, watch);
503 n += n < INT_MAX;
504 }
505#if !LELY_NO_THREADS
506 pthread_mutex_unlock(&poll->mtx);
507#endif
508 }
509
510#if !LELY_NO_THREADS
511 pthread_mutex_lock(&poll->mtx);
512#endif
513 thr->stopped = 1;
514 stopped = nevents != LELY_IO_EPOLL_MAXEVENTS;
515 } while (!stopped);
516 thr->stopped = 0;
517#if !LELY_NO_THREADS
518 pthread_mutex_unlock(&poll->mtx);
519#endif
520
521 errno = errsv;
522 return n;
523}
524
525static int
526io_poll_poll_kill(ev_poll_t *poll_, void *thr_)
527{
528#if !LELY_NO_THREADS
529 io_poll_t *poll = io_poll_from_poll(poll_);
530#endif
531 struct io_poll_thrd *thr = thr_;
532 assert(thr);
533
534 if (thr_ == io_poll_poll_self(poll_))
535 return 0;
536
537#if !LELY_NO_THREADS
538 pthread_mutex_lock(&poll->mtx);
539#endif
540 int stopped = thr->stopped;
541 if (!stopped)
542 thr->stopped = 1;
543#if !LELY_NO_THREADS
544 pthread_mutex_unlock(&poll->mtx);
545 if (!stopped) {
546 int errsv = pthread_kill(*thr->thread, poll->signo);
547 if (errsv) {
548 errno = errsv;
549 return -1;
550 }
551 }
552#endif
553 return 0;
554}
555
556static inline io_poll_t *
557io_poll_from_svc(const struct io_svc *svc)
558{
559 assert(svc);
560
561 return structof(svc, io_poll_t, svc);
562}
563
564static inline io_poll_t *
565io_poll_from_poll(const ev_poll_t *poll)
566{
567 assert(poll);
568
569 return structof(poll, io_poll_t, poll_vptr);
570}
571
572static int
573io_poll_open(io_poll_t *poll)
574{
575 if (io_poll_close(poll) == -1)
576 return -1;
577
578 return (poll->epfd = epoll_create1(EPOLL_CLOEXEC)) == -1 ? -1 : 0;
579}
580
581static int
582io_poll_close(io_poll_t *poll)
583{
584 assert(poll);
585
586 int epfd = poll->epfd;
587 if (epfd == -1)
588 return 0;
589 poll->epfd = -1;
590
591 return close(epfd);
592}
593
594static void
595io_poll_process(io_poll_t *poll, int revents, struct io_poll_watch *watch)
596{
597 assert(poll);
598 assert(poll->nwatch);
599 assert(watch);
600 assert(watch->_events);
601
602 watch->_events = 0;
603 poll->nwatch--;
604
605 if (watch->func) {
606#if !LELY_NO_THREADS
607 pthread_mutex_unlock(&poll->mtx);
608#endif
609 watch->func(watch, revents);
610#if !LELY_NO_THREADS
611 pthread_mutex_lock(&poll->mtx);
612#endif
613 }
614}
615
616static int
617io_fd_cmp(const void *p1, const void *p2)
618{
619 assert(p1);
620 int fd1 = *(const int *)p1;
621 assert(p2);
622 int fd2 = *(const int *)p2;
623
624 return (fd2 < fd1) - (fd1 < fd2);
625}
626
627static inline struct io_poll_watch *
628io_poll_watch_from_node(struct rbnode *node)
629{
630 assert(node);
631
632 return structof(node, struct io_poll_watch, _node);
633}
634
635static void
636sig_ign(int signo)
637{
638 (void)signo;
639}
640
641#endif // !LELY_NO_STDIO && __linux__
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:126
io_fork_event
The type of event generated by an I/O context before and after a process fork.
Definition: ctx.h:37
@ IO_FORK_CHILD
The event generated after the fork in the child process.
Definition: ctx.h:43
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:141
const struct ev_poll_vtbl *const ev_poll_t
The abstract polling interface.
Definition: poll.h:32
@ IO_EVENT_HUP
The device has been disconnected.
Definition: event.h:56
@ IO_EVENT_IN
Data (other than priority data) MAY be read without blocking.
Definition: event.h:35
@ IO_EVENT_OUT
Data (bot normal and priority data) MAY be written without blocking.
Definition: event.h:46
@ IO_EVENT_ERR
An error has occurred. This event is always reported.
Definition: event.h:48
@ IO_EVENT_PRI
Priority data MAY be read without blocking.
Definition: event.h:40
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition: features.h:249
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
ev_poll_t * io_poll_get_poll(const io_poll_t *poll)
Returns a pointer to the ev_poll_t instance corresponding to the I/O polling instance.
Definition: poll.c:281
io_ctx_t * io_poll_get_ctx(const io_poll_t *poll)
Returns a pointer to the I/O context with which the I/O polling instance is registered.
Definition: poll.c:275
This header file is part of the I/O library; it contains the I/O polling declarations for POSIX platf...
int io_poll_watch(io_poll_t *poll, io_handle_t handle, struct io_event *event, int keep)
Registers an I/O device with an I/O polling interface and instructs it to watch for certain events.
Definition: poll.c:252
void io_poll_destroy(io_poll_t *poll)
Destroys an I/O polling interface.
Definition: poll.c:243
io_poll_t * io_poll_create(void)
Creates a new I/O polling interface.
Definition: poll.c:218
void rbnode_init(struct rbnode *node, const void *key)
Initializes a node in a red-black tree.
Definition: rbtree.h:237
void rbtree_insert(struct rbtree *tree, struct rbnode *node)
Inserts a node into a red-black tree.
Definition: rbtree.c:108
struct rbnode * rbtree_find(const struct rbtree *tree, const void *key)
Finds a node in a red-black tree.
Definition: rbtree.c:306
void rbtree_init(struct rbtree *tree, rbtree_cmp_t *cmp)
Initializes a red-black tree.
Definition: rbtree.h:248
void rbtree_remove(struct rbtree *tree, struct rbnode *node)
Removes a node from a red-black tree.
Definition: rbtree.c:187
#define rbtree_foreach(tree, node)
Iterates over each node in a red-black tree in ascending order.
Definition: rbtree.h:234
This is the internal header file of the Windows-specific I/O declarations.
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
An I/O polling interface.
Definition: poll.c:51
struct rbtree tree
The tree containing the I/O device handles being watched.
Definition: poll.c:57
mtx_t mtx
The mutex protecting tree.
Definition: poll.c:54
int epfd
The epoll file descriptor.
Definition: poll.c:64
Definition: ctx.c:38
An object representing a file descriptor being monitored for I/O events.
Definition: poll.h:56
io_poll_watch_func_t * func
A pointer to the function to be invoked when an I/O event occurs.
Definition: poll.h:58
Definition: poll.c:88
The virtual table of an I/O service.
Definition: ctx.h:67
An I/O service.
Definition: ctx.h:49
A node in a red-black tree.
Definition: rbtree.h:53
A red-black tree.
Definition: rbtree.h:91
This header file is part of the C11 and POSIX compatibility library; it includes <unistd....