Lely core libraries  2.3.4
poll.c
Go to the documentation of this file.
1 
24 #include "io.h"
25 
26 #if !LELY_NO_STDIO && defined(__linux__)
27 
28 #include <lely/io2/posix/poll.h>
29 #include <lely/util/util.h>
30 
31 #include <assert.h>
32 #include <errno.h>
33 #include <limits.h>
34 #include <signal.h>
35 #include <stdlib.h>
36 
37 #if !LELY_NO_THREADS
38 #include <pthread.h>
39 #endif
40 #include <unistd.h>
41 
42 #include <sys/epoll.h>
43 
44 // clang-format off
45 #define EPOLL_EVENT_INIT(events, fd) \
46  { \
47  (((events) & IO_EVENT_IN) ? (EPOLLIN | EPOLLRDHUP) : 0) \
48  | (((events) & IO_EVENT_PRI) ? EPOLLPRI : 0) \
49  | (((events) & IO_EVENT_OUT) ? EPOLLOUT : 0) \
50  | EPOLLONESHOT, \
51  { .fd = (fd) } \
52  }
53 // clang-format on
54 
55 #ifndef LELY_IO_EPOLL_MAXEVENTS
56 #define LELY_IO_EPOLL_MAXEVENTS \
57  MAX((LELY_VLA_SIZE_MAX / sizeof(struct epoll_event)), 1)
58 #endif
59 
60 struct io_poll_thrd {
61  int stopped;
62 #if !LELY_NO_THREADS
63  pthread_t *thread;
64 #endif
65 };
66 
67 static int io_poll_svc_notify_fork(struct io_svc *svc, enum io_fork_event e);
68 
69 // clang-format off
70 static const struct io_svc_vtbl io_poll_svc_vtbl = {
71  &io_poll_svc_notify_fork,
72  NULL
73 };
74 // clang-format on
75 
76 static void *io_poll_poll_self(const ev_poll_t *poll);
77 static int io_poll_poll_wait(ev_poll_t *poll, int timeout);
78 static int io_poll_poll_kill(ev_poll_t *poll, void *thr);
79 
80 // clang-format off
81 static const struct ev_poll_vtbl io_poll_poll_vtbl = {
82  &io_poll_poll_self,
83  &io_poll_poll_wait,
84  &io_poll_poll_kill
85 };
86 // clang-format on
87 
88 struct io_poll {
89  struct io_svc svc;
90  const struct ev_poll_vtbl *poll_vptr;
91  io_ctx_t *ctx;
92  int signo;
93  struct sigaction oact;
94  sigset_t oset;
95  int epfd;
96 #if !LELY_NO_THREADS
97  pthread_mutex_t mtx;
98 #endif
99  struct rbtree tree;
100  size_t nwatch;
101 };
102 
103 static inline io_poll_t *io_poll_from_svc(const struct io_svc *svc);
104 static inline io_poll_t *io_poll_from_poll(const ev_poll_t *poll);
105 
106 static int io_poll_open(io_poll_t *poll);
107 static int io_poll_close(io_poll_t *poll);
108 
109 static void io_poll_process(
110  io_poll_t *poll, int revents, struct io_poll_watch *watch);
111 
112 static int io_fd_cmp(const void *p1, const void *p2);
113 
114 static inline struct io_poll_watch *io_poll_watch_from_node(
115  struct rbnode *node);
116 
117 static void sig_ign(int signo);
118 
119 void *
120 io_poll_alloc(void)
121 {
122  return malloc(sizeof(io_poll_t));
123 }
124 
125 void
126 io_poll_free(void *ptr)
127 {
128  free(ptr);
129 }
130 
131 io_poll_t *
132 io_poll_init(io_poll_t *poll, io_ctx_t *ctx, int signo)
133 {
134  assert(poll);
135  assert(ctx);
136 
137  if (!signo)
138  signo = SIGUSR1;
139 
140  int errsv = 0;
141 
142  poll->svc = (struct io_svc)IO_SVC_INIT(&io_poll_svc_vtbl);
143  poll->ctx = ctx;
144 
145  poll->poll_vptr = &io_poll_poll_vtbl;
146 
147  poll->signo = signo;
148 
149  struct sigaction act;
150  act.sa_handler = &sig_ign;
151  sigemptyset(&act.sa_mask);
152  act.sa_flags = 0;
153  if (sigaction(poll->signo, &act, &poll->oact) == -1) {
154  errsv = errno;
155  goto error_sigaction;
156  }
157 
158  // Block the wake up signal so it is only delivered during polling.
159  sigset_t set;
160  sigemptyset(&set);
161  sigaddset(&set, poll->signo);
162 #if LELY_NO_THREADS
163  if (sigprocmask(SIG_BLOCK, &set, &poll->oset) == -1) {
164  errsv = errno;
165 #else
166  if ((errsv = pthread_sigmask(SIG_BLOCK, &set, &poll->oset))) {
167 #endif
168  goto error_sigmask;
169  }
170 
171  poll->epfd = -1;
172 #if !LELY_NO_THREADS
173  if ((errsv = pthread_mutex_init(&poll->mtx, NULL)))
174  goto error_init_mtx;
175 #endif
176 
177  rbtree_init(&poll->tree, &io_fd_cmp);
178  poll->nwatch = 0;
179 
180  if (io_poll_open(poll) == -1) {
181  errsv = errno;
182  goto error_open;
183  }
184 
185  io_ctx_insert(poll->ctx, &poll->svc);
186 
187  return poll;
188 
189  // io_poll_close(poll);
190 error_open:
191 #if !LELY_NO_THREADS
192  pthread_mutex_destroy(&poll->mtx);
193 error_init_mtx:
194 #endif
195 #if LELY_NO_THREADS
196  sigprocmask(SIG_SETMASK, &poll->oset, NULL);
197 #else
198  pthread_sigmask(SIG_SETMASK, &poll->oset, NULL);
199 #endif
200 error_sigmask:
201  sigaction(poll->signo, &poll->oact, NULL);
202 error_sigaction:
203  errno = errsv;
204  return NULL;
205 }
206 
207 void
208 io_poll_fini(io_poll_t *poll)
209 {
210  assert(poll);
211 
212  io_ctx_remove(poll->ctx, &poll->svc);
213 
214  io_poll_close(poll);
215 
216 #if !LELY_NO_THREADS
217  pthread_mutex_destroy(&poll->mtx);
218 #endif
219 
220  // Clear any pending (and currently blocked) wake up signal.
221  sigset_t set;
222  sigemptyset(&set);
223  sigaddset(&set, poll->signo);
224  int errsv = errno;
225  while (sigtimedwait(&set, &(siginfo_t){ 0 }, &(struct timespec){ 0, 0 })
226  == poll->signo)
227  ;
228  errno = errsv;
229  // Unblock the wake up signal and restore the original signal handler.
230 #if LELY_NO_THREADS
231  sigprocmask(SIG_SETMASK, &poll->oset, NULL);
232 #else
233  pthread_sigmask(SIG_SETMASK, &poll->oset, NULL);
234 #endif
235  sigaction(poll->signo, &poll->oact, NULL);
236 }
237 
238 io_poll_t *
239 io_poll_create(io_ctx_t *ctx, int signo)
240 {
241  int errsv = 0;
242 
243  io_poll_t *poll = io_poll_alloc();
244  if (!poll) {
245  errsv = errno;
246  goto error_alloc;
247  }
248 
249  io_poll_t *tmp = io_poll_init(poll, ctx, signo);
250  if (!tmp) {
251  errsv = errno;
252  goto error_init;
253  }
254  poll = tmp;
255 
256  return poll;
257 
258 error_init:
259  io_poll_free(poll);
260 error_alloc:
261  errno = errsv;
262  return NULL;
263 }
264 
265 void
267 {
268  if (poll) {
269  io_poll_fini(poll);
270  io_poll_free(poll);
271  }
272 }
273 
274 io_ctx_t *
276 {
277  return poll->ctx;
278 }
279 
280 ev_poll_t *
282 {
283  return &poll->poll_vptr;
284 }
285 
286 int
287 io_poll_watch(io_poll_t *poll, int fd, int events, struct io_poll_watch *watch)
288 {
289  assert(poll);
290  assert(watch);
291  int epfd = poll->epfd;
292 
293  if (fd == -1 || fd == epfd) {
294  errno = EBADF;
295  return -1;
296  }
297 
298  if (events < 0) {
299  errno = EINVAL;
300  return -1;
301  }
302  events &= IO_EVENT_MASK;
303 
304  int result = -1;
305  int errsv = errno;
306 #if !LELY_NO_THREADS
307  pthread_mutex_lock(&poll->mtx);
308 #endif
309 
310  struct rbnode *node = rbtree_find(&poll->tree, &fd);
311  if (node && node != &watch->_node) {
312  errsv = EEXIST;
313  goto error;
314  }
315 
316  if (events) {
317  struct epoll_event event = EPOLL_EVENT_INIT(events, fd);
318  if (node && events != watch->_events) {
319  if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &event) == -1) {
320  errsv = errno;
321  epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
322  rbtree_remove(&poll->tree, node);
323  if (watch->_events)
324  poll->nwatch--;
325  watch->_events = 0;
326  goto error;
327  }
328  } else if (!node) {
329  if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) == -1) {
330  errsv = errno;
331  goto error;
332  }
333  watch->_fd = fd;
334  rbnode_init(&watch->_node, &watch->_fd);
335  watch->_events = 0;
336  rbtree_insert(&poll->tree, &watch->_node);
337  }
338  if (!watch->_events)
339  poll->nwatch++;
340  watch->_events = events;
341  } else if (node) {
342  epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
343  rbtree_remove(&poll->tree, node);
344  if (watch->_events)
345  poll->nwatch--;
346  watch->_events = 0;
347  }
348 
349  result = 0;
350 
351 error:
352 #if !LELY_NO_THREADS
353  pthread_mutex_unlock(&poll->mtx);
354 #endif
355  errno = errsv;
356  return result;
357 }
358 
359 static int
360 io_poll_svc_notify_fork(struct io_svc *svc, enum io_fork_event e)
361 {
362  io_poll_t *poll = io_poll_from_svc(svc);
363 
364  if (e != IO_FORK_CHILD)
365  return 0;
366 
367  int result = 0;
368  int errsv = errno;
369 
370  if (io_poll_close(poll) == -1) {
371  errsv = errno;
372  result = -1;
373  }
374 
375  if (io_poll_open(poll) == -1 && !result) {
376  errsv = errno;
377  result = -1;
378  }
379 
380  int epfd = poll->epfd;
381  rbtree_foreach (&poll->tree, node) {
382  struct io_poll_watch *watch = io_poll_watch_from_node(node);
383  int fd = watch->_fd;
384  int events = watch->_events;
385  if (events) {
386  struct epoll_event event = EPOLL_EVENT_INIT(events, fd);
387 
388  if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) == -1) {
389  if (!result) {
390  errsv = errno;
391  result = -1;
392  }
393  rbtree_remove(&poll->tree, node);
394  }
395  } else {
396  rbtree_remove(&poll->tree, node);
397  }
398  }
399 
400  errno = errsv;
401  return result;
402 }
403 
404 static void *
405 io_poll_poll_self(const ev_poll_t *poll)
406 {
407  (void)poll;
408 
409 #if LELY_NO_THREADS
410  static struct io_poll_thrd thr = { 0 };
411 #else
412  static _Thread_local struct io_poll_thrd thr = { 0, NULL };
413  if (!thr.thread) {
414  static _Thread_local pthread_t thread;
415  thread = pthread_self();
416  thr.thread = &thread;
417  }
418 #endif
419  return &thr;
420 }
421 
422 static int
423 io_poll_poll_wait(ev_poll_t *poll_, int timeout)
424 {
425  io_poll_t *poll = io_poll_from_poll(poll_);
426  void *thr_ = io_poll_poll_self(poll_);
427  struct io_poll_thrd *thr = (struct io_poll_thrd *)thr_;
428 
429  int n = 0;
430  int errsv = errno;
431 
432  sigset_t set;
433  sigemptyset(&set);
434 
435  struct epoll_event events[LELY_IO_EPOLL_MAXEVENTS];
436 
437 #if !LELY_NO_THREADS
438  pthread_mutex_lock(&poll->mtx);
439 #endif
440  if (!timeout)
441  thr->stopped = 1;
442 
443  int stopped = 0;
444  do {
445  if (thr->stopped) {
446  if (!poll->nwatch)
447  break;
448  timeout = 0;
449  }
450 
451 #if !LELY_NO_THREADS
452  pthread_mutex_unlock(&poll->mtx);
453 #endif
454  errno = 0;
455  int nevents = epoll_pwait(poll->epfd, events,
456  LELY_IO_EPOLL_MAXEVENTS, timeout, &set);
457  if (nevents == -1 && errno == EINTR) {
458  // If the wait is interrupted by a signal, we don't
459  // receive any events. This can result in starvation if
460  // too many signals are generated (e.g., when too many
461  // tasks are queued). To prevent starvation, we poll for
462  // events again, but this time with the interrupt signal
463  // blocked (and timeout 0).
464  sigaddset(&set, poll->signo);
465 #if !LELY_NO_THREADS
466  pthread_mutex_lock(&poll->mtx);
467 #endif
468  thr->stopped = 1;
469  continue;
470  }
471  if (nevents == -1) {
472  if (!n) {
473  errsv = errno;
474  n = -1;
475  }
476 #if !LELY_NO_THREADS
477  pthread_mutex_lock(&poll->mtx);
478 #endif
479  break;
480  }
481 
482  for (int i = 0; i < nevents; i++) {
483  int revents = 0;
484  if (events[i].events & (EPOLLIN | EPOLLRDHUP))
485  revents |= IO_EVENT_IN;
486  if (events[i].events & EPOLLPRI)
487  revents |= IO_EVENT_PRI;
488  if (events[i].events & EPOLLOUT)
489  revents |= IO_EVENT_OUT;
490  if (events[i].events & EPOLLERR)
491  revents |= IO_EVENT_ERR;
492  if (events[i].events & EPOLLHUP)
493  revents |= IO_EVENT_HUP;
494  int fd = events[i].data.fd;
495 #if !LELY_NO_THREADS
496  pthread_mutex_lock(&poll->mtx);
497 #endif
498  struct rbnode *node = rbtree_find(&poll->tree, &fd);
499  if (node) {
500  struct io_poll_watch *watch =
501  io_poll_watch_from_node(node);
502  io_poll_process(poll, revents, watch);
503  n += n < INT_MAX;
504  }
505 #if !LELY_NO_THREADS
506  pthread_mutex_unlock(&poll->mtx);
507 #endif
508  }
509 
510 #if !LELY_NO_THREADS
511  pthread_mutex_lock(&poll->mtx);
512 #endif
513  thr->stopped = 1;
514  stopped = nevents != LELY_IO_EPOLL_MAXEVENTS;
515  } while (!stopped);
516  thr->stopped = 0;
517 #if !LELY_NO_THREADS
518  pthread_mutex_unlock(&poll->mtx);
519 #endif
520 
521  errno = errsv;
522  return n;
523 }
524 
525 static int
526 io_poll_poll_kill(ev_poll_t *poll_, void *thr_)
527 {
528 #if !LELY_NO_THREADS
529  io_poll_t *poll = io_poll_from_poll(poll_);
530 #endif
531  struct io_poll_thrd *thr = thr_;
532  assert(thr);
533 
534  if (thr_ == io_poll_poll_self(poll_))
535  return 0;
536 
537 #if !LELY_NO_THREADS
538  pthread_mutex_lock(&poll->mtx);
539 #endif
540  int stopped = thr->stopped;
541  if (!stopped)
542  thr->stopped = 1;
543 #if !LELY_NO_THREADS
544  pthread_mutex_unlock(&poll->mtx);
545  if (!stopped) {
546  int errsv = pthread_kill(*thr->thread, poll->signo);
547  if (errsv) {
548  errno = errsv;
549  return -1;
550  }
551  }
552 #endif
553  return 0;
554 }
555 
556 static inline io_poll_t *
557 io_poll_from_svc(const struct io_svc *svc)
558 {
559  assert(svc);
560 
561  return structof(svc, io_poll_t, svc);
562 }
563 
564 static inline io_poll_t *
565 io_poll_from_poll(const ev_poll_t *poll)
566 {
567  assert(poll);
568 
569  return structof(poll, io_poll_t, poll_vptr);
570 }
571 
572 static int
573 io_poll_open(io_poll_t *poll)
574 {
575  if (io_poll_close(poll) == -1)
576  return -1;
577 
578  return (poll->epfd = epoll_create1(EPOLL_CLOEXEC)) == -1 ? -1 : 0;
579 }
580 
581 static int
582 io_poll_close(io_poll_t *poll)
583 {
584  assert(poll);
585 
586  int epfd = poll->epfd;
587  if (epfd == -1)
588  return 0;
589  poll->epfd = -1;
590 
591  return close(epfd);
592 }
593 
594 static void
595 io_poll_process(io_poll_t *poll, int revents, struct io_poll_watch *watch)
596 {
597  assert(poll);
598  assert(poll->nwatch);
599  assert(watch);
600  assert(watch->_events);
601 
602  watch->_events = 0;
603  poll->nwatch--;
604 
605  if (watch->func) {
606 #if !LELY_NO_THREADS
607  pthread_mutex_unlock(&poll->mtx);
608 #endif
609  watch->func(watch, revents);
610 #if !LELY_NO_THREADS
611  pthread_mutex_lock(&poll->mtx);
612 #endif
613  }
614 }
615 
616 static int
617 io_fd_cmp(const void *p1, const void *p2)
618 {
619  assert(p1);
620  int fd1 = *(const int *)p1;
621  assert(p2);
622  int fd2 = *(const int *)p2;
623 
624  return (fd2 < fd1) - (fd1 < fd2);
625 }
626 
627 static inline struct io_poll_watch *
628 io_poll_watch_from_node(struct rbnode *node)
629 {
630  assert(node);
631 
632  return structof(node, struct io_poll_watch, _node);
633 }
634 
635 static void
636 sig_ign(int signo)
637 {
638  (void)signo;
639 }
640 
641 #endif // !LELY_NO_STDIO && __linux__
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:126
io_fork_event
The type of event generated by an I/O context before and after a process fork.
Definition: ctx.h:37
@ IO_FORK_CHILD
The event generated after the fork in the child process.
Definition: ctx.h:43
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:141
const struct ev_poll_vtbl *const ev_poll_t
The abstract polling interface.
Definition: poll.h:32
@ IO_EVENT_HUP
The device has been disconnected.
Definition: event.h:56
@ IO_EVENT_IN
Data (other than priority data) MAY be read without blocking.
Definition: event.h:35
@ IO_EVENT_OUT
Data (bot normal and priority data) MAY be written without blocking.
Definition: event.h:46
@ IO_EVENT_ERR
An error has occurred. This event is always reported.
Definition: event.h:48
@ IO_EVENT_PRI
Priority data MAY be read without blocking.
Definition: event.h:40
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition: features.h:249
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
io_ctx_t * io_poll_get_ctx(const io_poll_t *poll)
Returns a pointer to the I/O context with which the I/O polling instance is registered.
Definition: poll.c:275
ev_poll_t * io_poll_get_poll(const io_poll_t *poll)
Returns a pointer to the ev_poll_t instance corresponding to the I/O polling instance.
Definition: poll.c:281
This header file is part of the I/O library; it contains the I/O polling declarations for POSIX platf...
int io_poll_watch(io_poll_t *poll, io_handle_t handle, struct io_event *event, int keep)
Registers an I/O device with an I/O polling interface and instructs it to watch for certain events.
Definition: poll.c:252
io_poll_t * io_poll_create(void)
Creates a new I/O polling interface.
Definition: poll.c:218
void io_poll_destroy(io_poll_t *poll)
Destroys an I/O polling interface.
Definition: poll.c:243
void rbnode_init(struct rbnode *node, const void *key)
Initializes a node in a red-black tree.
Definition: rbtree.h:237
void rbtree_insert(struct rbtree *tree, struct rbnode *node)
Inserts a node into a red-black tree.
Definition: rbtree.c:108
void rbtree_init(struct rbtree *tree, rbtree_cmp_t *cmp)
Initializes a red-black tree.
Definition: rbtree.h:248
void rbtree_remove(struct rbtree *tree, struct rbnode *node)
Removes a node from a red-black tree.
Definition: rbtree.c:187
#define rbtree_foreach(tree, node)
Iterates over each node in a red-black tree in ascending order.
Definition: rbtree.h:234
struct rbnode * rbtree_find(const struct rbtree *tree, const void *key)
Finds a node in a red-black tree.
Definition: rbtree.c:306
This is the internal header file of the Windows-specific I/O declarations.
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
An I/O polling interface.
Definition: poll.c:51
struct rbtree tree
The tree containing the I/O device handles being watched.
Definition: poll.c:57
mtx_t mtx
The mutex protecting tree.
Definition: poll.c:54
int epfd
The epoll file descriptor.
Definition: poll.c:64
Definition: ctx.c:38
An object representing a file descriptor being monitored for I/O events.
Definition: poll.h:56
io_poll_watch_func_t * func
A pointer to the function to be invoked when an I/O event occurs.
Definition: poll.h:58
Definition: poll.c:88
The virtual table of an I/O service.
Definition: ctx.h:67
An I/O service.
Definition: ctx.h:49
A node in a red-black tree.
Definition: rbtree.h:53
A red-black tree.
Definition: rbtree.h:91
This header file is part of the C11 and POSIX compatibility library; it includes <unistd....