Lely core libraries  2.2.5
poll.c
Go to the documentation of this file.
1 
24 #include "io.h"
25 
26 #ifdef __linux__
27 
28 #include <lely/io2/posix/poll.h>
29 #include <lely/util/util.h>
30 
31 #include <assert.h>
32 #include <errno.h>
33 #include <limits.h>
34 #include <signal.h>
35 #include <stdlib.h>
36 
37 #if !LELY_NO_THREADS
38 #include <pthread.h>
39 #endif
40 #include <unistd.h>
41 
42 #include <sys/epoll.h>
43 
44 // clang-format off
45 #define EPOLL_EVENT_INIT(events, fd) \
46  { \
47  (((events) & IO_EVENT_IN) ? (EPOLLIN | EPOLLRDHUP) : 0) \
48  | (((events) & IO_EVENT_PRI) ? EPOLLPRI : 0) \
49  | (((events) & IO_EVENT_OUT) ? EPOLLOUT : 0) \
50  | EPOLLONESHOT, \
51  { .fd = (fd) } \
52  }
53 // clang-format on
54 
55 #ifndef LELY_IO_EPOLL_MAXEVENTS
56 #define LELY_IO_EPOLL_MAXEVENTS \
57  MAX((LELY_VLA_SIZE_MAX / sizeof(struct epoll_event)), 1)
58 #endif
59 
60 struct io_poll_thrd {
61  int stopped;
62 #if !LELY_NO_THREADS
63  pthread_t *thread;
64 #endif
65 };
66 
67 static int io_poll_svc_notify_fork(struct io_svc *svc, enum io_fork_event e);
68 
69 // clang-format off
70 static const struct io_svc_vtbl io_poll_svc_vtbl = {
71  &io_poll_svc_notify_fork,
72  NULL
73 };
74 // clang-format on
75 
76 static void *io_poll_poll_self(const ev_poll_t *poll);
77 static int io_poll_poll_wait(ev_poll_t *poll, int timeout);
78 static int io_poll_poll_kill(ev_poll_t *poll, void *thr);
79 
80 // clang-format off
81 static const struct ev_poll_vtbl io_poll_poll_vtbl = {
82  &io_poll_poll_self,
83  &io_poll_poll_wait,
84  &io_poll_poll_kill
85 };
86 // clang-format on
87 
88 struct io_poll {
89  struct io_svc svc;
90  const struct ev_poll_vtbl *poll_vptr;
91  io_ctx_t *ctx;
92  int signo;
93  struct sigaction oact;
94  sigset_t oset;
95  int epfd;
96 #if !LELY_NO_THREADS
97  pthread_mutex_t mtx;
98 #endif
99  struct rbtree tree;
100  size_t nwatch;
101 };
102 
103 static inline io_poll_t *io_poll_from_svc(const struct io_svc *svc);
104 static inline io_poll_t *io_poll_from_poll(const ev_poll_t *poll);
105 
106 static int io_poll_open(io_poll_t *poll);
107 static int io_poll_close(io_poll_t *poll);
108 
109 static void io_poll_process(
110  io_poll_t *poll, int revents, struct io_poll_watch *watch);
111 
112 static int io_fd_cmp(const void *p1, const void *p2);
113 
114 static inline struct io_poll_watch *io_poll_watch_from_node(
115  struct rbnode *node);
116 
117 static void sig_ign(int signo);
118 
119 void *
120 io_poll_alloc(void)
121 {
122  return malloc(sizeof(io_poll_t));
123 }
124 
125 void
126 io_poll_free(void *ptr)
127 {
128  free(ptr);
129 }
130 
131 io_poll_t *
132 io_poll_init(io_poll_t *poll, io_ctx_t *ctx, int signo)
133 {
134  assert(poll);
135  assert(ctx);
136 
137  if (!signo)
138  signo = SIGUSR1;
139 
140  int errsv = 0;
141 
142  poll->svc = (struct io_svc)IO_SVC_INIT(&io_poll_svc_vtbl);
143  poll->ctx = ctx;
144 
145  poll->poll_vptr = &io_poll_poll_vtbl;
146 
147  poll->signo = signo;
148 
149  struct sigaction act;
150  act.sa_handler = &sig_ign;
151  sigemptyset(&act.sa_mask);
152  act.sa_flags = 0;
153  if (sigaction(poll->signo, &act, &poll->oact) == -1) {
154  errsv = errno;
155  goto error_sigaction;
156  }
157 
158  // Block the wake up signal so it is only delivered during polling.
159  sigset_t set;
160  sigemptyset(&set);
161  sigaddset(&set, poll->signo);
162 #if LELY_NO_THREADS
163  if (sigprocmask(SIG_BLOCK, &set, &poll->oset) == -1) {
164  errsv = errno;
165 #else
166  if ((errsv = pthread_sigmask(SIG_BLOCK, &set, &poll->oset))) {
167 #endif
168  goto error_sigmask;
169  }
170 
171  poll->epfd = -1;
172 #if !LELY_NO_THREADS
173  if ((errsv = pthread_mutex_init(&poll->mtx, NULL)))
174  goto error_init_mtx;
175 #endif
176 
177  rbtree_init(&poll->tree, &io_fd_cmp);
178  poll->nwatch = 0;
179 
180  if (io_poll_open(poll) == -1) {
181  errsv = errno;
182  goto error_open;
183  }
184 
185  io_ctx_insert(poll->ctx, &poll->svc);
186 
187  return poll;
188 
189  // io_poll_close(poll);
190 error_open:
191 #if !LELY_NO_THREADS
192  pthread_mutex_destroy(&poll->mtx);
193 error_init_mtx:
194 #endif
195 #if LELY_NO_THREADS
196  sigprocmask(SIG_SETMASK, &poll->oset, NULL);
197 #else
198  pthread_sigmask(SIG_SETMASK, &poll->oset, NULL);
199 #endif
200 error_sigmask:
201  sigaction(poll->signo, &poll->oact, NULL);
202 error_sigaction:
203  errno = errsv;
204  return NULL;
205 }
206 
207 void
208 io_poll_fini(io_poll_t *poll)
209 {
210  assert(poll);
211 
212  io_ctx_remove(poll->ctx, &poll->svc);
213 
214  io_poll_close(poll);
215 
216 #if !LELY_NO_THREADS
217  pthread_mutex_destroy(&poll->mtx);
218 #endif
219 
220  // Clear any pending (and currently blocked) wake up signal.
221  sigset_t set;
222  sigemptyset(&set);
223  sigaddset(&set, poll->signo);
224  int errsv = errno;
225  while (sigtimedwait(&set, &(siginfo_t){ 0 }, &(struct timespec){ 0, 0 })
226  == poll->signo)
227  ;
228  errno = errsv;
229  // Unblock the wake up signal and restore the original signal handler.
230 #if LELY_NO_THREADS
231  sigprocmask(SIG_SETMASK, &poll->oset, NULL);
232 #else
233  pthread_sigmask(SIG_SETMASK, &poll->oset, NULL);
234 #endif
235  sigaction(poll->signo, &poll->oact, NULL);
236 }
237 
238 io_poll_t *
239 io_poll_create(io_ctx_t *ctx, int signo)
240 {
241  int errsv = 0;
242 
243  io_poll_t *poll = io_poll_alloc();
244  if (!poll) {
245  errsv = errno;
246  goto error_alloc;
247  }
248 
249  io_poll_t *tmp = io_poll_init(poll, ctx, signo);
250  if (!tmp) {
251  errsv = errno;
252  goto error_init;
253  }
254  poll = tmp;
255 
256  return poll;
257 
258 error_init:
259  io_poll_free(poll);
260 error_alloc:
261  errno = errsv;
262  return NULL;
263 }
264 
265 void
267 {
268  if (poll) {
269  io_poll_fini(poll);
270  io_poll_free(poll);
271  }
272 }
273 
274 io_ctx_t *
276 {
277  return poll->ctx;
278 }
279 
280 ev_poll_t *
282 {
283  return &poll->poll_vptr;
284 }
285 
286 int
288 {
289  assert(poll);
290 
291  return poll->epfd;
292 }
293 
294 int
295 io_poll_watch(io_poll_t *poll, int fd, int events, struct io_poll_watch *watch)
296 {
297  assert(poll);
298  assert(watch);
299  int epfd = poll->epfd;
300 
301  if (fd == -1 || fd == epfd) {
302  errno = EBADF;
303  return -1;
304  }
305 
306  if (events < 0) {
307  errno = EINVAL;
308  return -1;
309  }
310  events &= IO_EVENT_MASK;
311 
312  int result = -1;
313  int errsv = errno;
314 #if !LELY_NO_THREADS
315  pthread_mutex_lock(&poll->mtx);
316 #endif
317 
318  struct rbnode *node = rbtree_find(&poll->tree, &fd);
319  if (node && node != &watch->_node) {
320  errsv = EEXIST;
321  goto error;
322  }
323 
324  if (events) {
325  struct epoll_event event = EPOLL_EVENT_INIT(events, fd);
326  if (node && events != watch->_events) {
327  if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &event) == -1) {
328  errsv = errno;
329  epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
330  rbtree_remove(&poll->tree, node);
331  if (watch->_events)
332  poll->nwatch--;
333  watch->_events = 0;
334  goto error;
335  }
336  } else if (!node) {
337  if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) == -1) {
338  errsv = errno;
339  goto error;
340  }
341  watch->_fd = fd;
342  rbnode_init(&watch->_node, &watch->_fd);
343  watch->_events = 0;
344  rbtree_insert(&poll->tree, &watch->_node);
345  }
346  if (!watch->_events)
347  poll->nwatch++;
348  watch->_events = events;
349  } else if (node) {
350  epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
351  rbtree_remove(&poll->tree, node);
352  if (watch->_events)
353  poll->nwatch--;
354  watch->_events = 0;
355  }
356 
357  result = 0;
358 
359 error:
360 #if !LELY_NO_THREADS
361  pthread_mutex_unlock(&poll->mtx);
362 #endif
363  errno = errsv;
364  return result;
365 }
366 
367 static int
368 io_poll_svc_notify_fork(struct io_svc *svc, enum io_fork_event e)
369 {
370  io_poll_t *poll = io_poll_from_svc(svc);
371 
372  if (e != IO_FORK_CHILD)
373  return 0;
374 
375  int result = 0;
376  int errsv = errno;
377 
378  if (io_poll_close(poll) == -1 && !result) {
379  errsv = errno;
380  result = -1;
381  }
382 
383  if (io_poll_open(poll) == -1 && !result) {
384  errsv = errno;
385  result = -1;
386  }
387 
388  int epfd = poll->epfd;
389  rbtree_foreach (&poll->tree, node) {
390  struct io_poll_watch *watch = io_poll_watch_from_node(node);
391  int fd = watch->_fd;
392  int events = watch->_events;
393  if (events) {
394  struct epoll_event event = EPOLL_EVENT_INIT(events, fd);
395 
396  if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event) == -1) {
397  if (!result) {
398  errsv = errno;
399  result = -1;
400  }
401  rbtree_remove(&poll->tree, node);
402  }
403  } else {
404  rbtree_remove(&poll->tree, node);
405  }
406  }
407 
408  errno = errsv;
409  return result;
410 }
411 
412 static void *
413 io_poll_poll_self(const ev_poll_t *poll)
414 {
415  (void)poll;
416 
417 #if LELY_NO_THREADS
418  static struct io_poll_thrd thr = { 0 };
419 #else
420  static _Thread_local struct io_poll_thrd thr = { 0, NULL };
421  if (!thr.thread) {
422  static _Thread_local pthread_t thread;
423  thread = pthread_self();
424  thr.thread = &thread;
425  }
426 #endif
427  return &thr;
428 }
429 
430 static int
431 io_poll_poll_wait(ev_poll_t *poll_, int timeout)
432 {
433  io_poll_t *poll = io_poll_from_poll(poll_);
434  void *thr_ = io_poll_poll_self(poll_);
435  struct io_poll_thrd *thr = (struct io_poll_thrd *)thr_;
436 
437  int n = 0;
438  int errsv = errno;
439 
440  sigset_t set;
441  sigemptyset(&set);
442 
443  struct epoll_event events[LELY_IO_EPOLL_MAXEVENTS];
444 
445 #if !LELY_NO_THREADS
446  pthread_mutex_lock(&poll->mtx);
447 #endif
448  if (!timeout)
449  thr->stopped = 1;
450 
451  int stopped = 0;
452  do {
453  if (thr->stopped) {
454  if (!poll->nwatch)
455  break;
456  timeout = 0;
457  }
458 
459 #if !LELY_NO_THREADS
460  pthread_mutex_unlock(&poll->mtx);
461 #endif
462  errno = 0;
463  int nevents = epoll_pwait(poll->epfd, events,
464  LELY_IO_EPOLL_MAXEVENTS, timeout, &set);
465  if (nevents == -1 && errno == EINTR) {
466  // If the wait is interrupted by a signal, we don't
467  // receive any events. This can result in starvation if
468  // too many signals are generated (e.g., when too many
469  // tasks are queued). To prevent starvation, we poll for
470  // events again, but this time with the interrupt signal
471  // blocked (and timeout 0).
472  sigaddset(&set, poll->signo);
473 #if !LELY_NO_THREADS
474  pthread_mutex_lock(&poll->mtx);
475 #endif
476  thr->stopped = 1;
477  continue;
478  }
479  if (nevents == -1) {
480  if (!n) {
481  errsv = errno;
482  n = -1;
483  }
484 #if !LELY_NO_THREADS
485  pthread_mutex_lock(&poll->mtx);
486 #endif
487  break;
488  }
489 
490  for (int i = 0; i < nevents; i++) {
491  int revents = 0;
492  if (events[i].events & (EPOLLIN | EPOLLRDHUP))
493  revents |= IO_EVENT_IN;
494  if (events[i].events & EPOLLPRI)
495  revents |= IO_EVENT_PRI;
496  if (events[i].events & EPOLLOUT)
497  revents |= IO_EVENT_OUT;
498  if (events[i].events & EPOLLERR)
499  revents |= IO_EVENT_ERR;
500  if (events[i].events & EPOLLHUP)
501  revents |= IO_EVENT_HUP;
502  int fd = events[i].data.fd;
503 #if !LELY_NO_THREADS
504  pthread_mutex_lock(&poll->mtx);
505 #endif
506  struct rbnode *node = rbtree_find(&poll->tree, &fd);
507  if (node) {
508  struct io_poll_watch *watch =
509  io_poll_watch_from_node(node);
510  io_poll_process(poll, revents, watch);
511  n += n < INT_MAX;
512  }
513 #if !LELY_NO_THREADS
514  pthread_mutex_unlock(&poll->mtx);
515 #endif
516  }
517 
518 #if !LELY_NO_THREADS
519  pthread_mutex_lock(&poll->mtx);
520 #endif
521  thr->stopped = 1;
522  stopped = nevents != LELY_IO_EPOLL_MAXEVENTS;
523  } while (!stopped);
524  thr->stopped = 0;
525 #if !LELY_NO_THREADS
526  pthread_mutex_unlock(&poll->mtx);
527 #endif
528 
529  errno = errsv;
530  return n;
531 }
532 
533 static int
534 io_poll_poll_kill(ev_poll_t *poll_, void *thr_)
535 {
536 #if !LELY_NO_THREADS
537  io_poll_t *poll = io_poll_from_poll(poll_);
538 #endif
539  struct io_poll_thrd *thr = thr_;
540  assert(thr);
541 
542  if (thr_ == io_poll_poll_self(poll_))
543  return 0;
544 
545 #if !LELY_NO_THREADS
546  pthread_mutex_lock(&poll->mtx);
547 #endif
548  int stopped = thr->stopped;
549  if (!stopped)
550  thr->stopped = 1;
551 #if !LELY_NO_THREADS
552  pthread_mutex_unlock(&poll->mtx);
553  if (!stopped) {
554  int errsv = pthread_kill(*thr->thread, poll->signo);
555  if (errsv) {
556  errno = errsv;
557  return -1;
558  }
559  }
560 #endif
561  return 0;
562 }
563 
564 static inline io_poll_t *
565 io_poll_from_svc(const struct io_svc *svc)
566 {
567  assert(svc);
568 
569  return structof(svc, io_poll_t, svc);
570 }
571 
572 static inline io_poll_t *
573 io_poll_from_poll(const ev_poll_t *poll)
574 {
575  assert(poll);
576 
577  return structof(poll, io_poll_t, poll_vptr);
578 }
579 
580 static int
581 io_poll_open(io_poll_t *poll)
582 {
583  if (io_poll_close(poll) == -1)
584  return -1;
585 
586  return (poll->epfd = epoll_create1(EPOLL_CLOEXEC)) == -1 ? -1 : 0;
587 }
588 
589 static int
590 io_poll_close(io_poll_t *poll)
591 {
592  assert(poll);
593 
594  int epfd = poll->epfd;
595  if (epfd == -1)
596  return 0;
597  poll->epfd = -1;
598 
599  return close(epfd);
600 }
601 
602 static void
603 io_poll_process(io_poll_t *poll, int revents, struct io_poll_watch *watch)
604 {
605  assert(poll);
606  assert(poll->nwatch);
607  assert(watch);
608  assert(watch->_events);
609 
610  watch->_events = 0;
611  poll->nwatch--;
612 
613  if (watch->func) {
614 #if !LELY_NO_THREADS
615  pthread_mutex_unlock(&poll->mtx);
616 #endif
617  watch->func(watch, revents);
618 #if !LELY_NO_THREADS
619  pthread_mutex_lock(&poll->mtx);
620 #endif
621  }
622 }
623 
624 static int
625 io_fd_cmp(const void *p1, const void *p2)
626 {
627  assert(p1);
628  int fd1 = *(const int *)p1;
629  assert(p2);
630  int fd2 = *(const int *)p2;
631 
632  return (fd2 < fd1) - (fd1 < fd2);
633 }
634 
635 static inline struct io_poll_watch *
636 io_poll_watch_from_node(struct rbnode *node)
637 {
638  assert(node);
639 
640  return structof(node, struct io_poll_watch, _node);
641 }
642 
643 static void
644 sig_ign(int signo)
645 {
646  (void)signo;
647 }
648 
649 #endif // __linux__
void rbnode_init(struct rbnode *node, const void *key)
Initializes a node in a red-black tree.
Definition: rbtree.h:229
An I/O polling interface.
Definition: poll.c:48
mtx_t mtx
The mutex protecting tree.
Definition: poll.c:51
Data (other than priority data) MAY be read without blocking.
Definition: event.h:35
The event generated after the fork in the child process.
Definition: ctx.h:43
Priority data MAY be read without blocking.
Definition: event.h:40
Definition: ctx.c:35
io_fork_event
The type of event generated by an I/O context before and after a process fork.
Definition: ctx.h:37
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:121
struct rbtree tree
The tree containing the I/O device handles being watched.
Definition: poll.c:54
This is the internal header file of the Linux-specific I/O declarations.
int io_poll_get_fd(const io_poll_t *poll)
Returns the epoll file descriptor used by the I/O polling instance.
Definition: poll.c:287
io_ctx_t * io_poll_get_ctx(const io_poll_t *poll)
Returns a pointer to the I/O context with which the I/O polling instance is registered.
Definition: poll.c:275
An error has occurred. This event is always reported.
Definition: event.h:48
ev_poll_t * io_poll_get_poll(const io_poll_t *poll)
Returns a pointer to the ev_poll_t instance corresponding to the I/O polling instance.
Definition: poll.c:281
void io_poll_destroy(io_poll_t *poll)
Destroys an I/O polling interface.
Definition: poll.c:240
#define rbtree_foreach(tree, node)
Iterates over each node in a red-black tree in ascending order.
Definition: rbtree.h:226
A red-black tree.
Definition: rbtree.h:90
An I/O service.
Definition: ctx.h:49
This header file is part of the C11 and POSIX compatibility library; it includes <unistd.h>, if it exists, and defines any missing functionality.
struct rbnode * rbtree_find(const struct rbtree *tree, const void *key)
Finds a node in a red-black tree.
Definition: rbtree.c:306
const struct ev_poll_vtbl *const ev_poll_t
The abstract polling interface.
Definition: poll.h:32
int epfd
The epoll file descriptor.
Definition: poll.c:61
void rbtree_remove(struct rbtree *tree, struct rbnode *node)
Removes a node from a red-black tree.
Definition: rbtree.c:187
This header file is part of the I/O library; it contains the I/O polling declarations for POSIX platf...
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:136
An object representing a file descriptor being monitored for I/O events.
Definition: poll.h:56
The device has been disconnected.
Definition: event.h:56
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
int io_poll_watch(io_poll_t *poll, io_handle_t handle, struct io_event *event, int keep)
Registers an I/O device with an I/O polling interface and instructs it to watch for certain events...
Definition: poll.c:249
io_poll_t * io_poll_create(void)
Creates a new I/O polling interface.
Definition: poll.c:215
The virtual table of an I/O service.
Definition: ctx.h:67
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition: features.h:239
Data (bot normal and priority data) MAY be written without blocking.
Definition: event.h:46
void rbtree_init(struct rbtree *tree, rbtree_cmp_t *cmp)
Initializes a red-black tree.
Definition: rbtree.h:238
void rbtree_insert(struct rbtree *tree, struct rbnode *node)
Inserts a node into a red-black tree.
Definition: rbtree.c:108
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib.h> and defines any missing functionality.
io_poll_watch_func_t * func
A pointer to the function to be invoked when an I/O event occurs.
Definition: poll.h:58
Definition: poll.c:88
This is the public header file of the utilities library.
A node in a red-black tree.
Definition: rbtree.h:52