Lely core libraries  2.2.5
poll.c
Go to the documentation of this file.
1 
24 #include "io.h"
25 
26 #if _WIN32
27 
28 #include <lely/ev/task.h>
29 #include <lely/io2/win32/poll.h>
30 #include <lely/util/errnum.h>
31 #include <lely/util/util.h>
32 
33 #include <assert.h>
34 #include <limits.h>
35 #include <stdlib.h>
36 
37 #ifndef LELY_IO_IOCP_COUNT
38 #define LELY_IO_IOCP_COUNT \
39  MAX((LELY_VLA_SIZE_MAX / sizeof(OVERLAPPED_ENTRY)), 1)
40 #endif
41 
42 struct io_poll_thrd {
43  BOOL stopped;
44 #if !LELY_NO_THREADS
45  LPDWORD lpdwThreadId;
46 #endif
47 };
48 
49 #if !LELY_NO_THREADS
50 static void CALLBACK io_poll_kill_func(ULONG_PTR Parameter);
51 #endif
52 
53 // clang-format off
54 static const struct io_svc_vtbl io_poll_svc_vtbl = {
55  NULL,
56  NULL
57 };
58 // clang-format on
59 
60 static void *io_poll_poll_self(const ev_poll_t *poll);
61 static int io_poll_poll_wait(ev_poll_t *poll, int timeout);
62 static int io_poll_poll_kill(ev_poll_t *poll, void *thr);
63 
64 // clang-format off
65 static const struct ev_poll_vtbl io_poll_poll_vtbl = {
66  &io_poll_poll_self,
67  &io_poll_poll_wait,
68  &io_poll_poll_kill
69 };
70 // clang-format on
71 
72 struct io_poll {
73  struct io_svc svc;
74  const struct ev_poll_vtbl *poll_vptr;
75  io_ctx_t *ctx;
76  HANDLE CompletionPort;
77 };
78 
79 static inline io_poll_t *io_poll_from_svc(const struct io_svc *svc);
80 static inline io_poll_t *io_poll_from_poll(const ev_poll_t *poll);
81 
82 void *
83 io_poll_alloc(void)
84 {
85  void *ptr = malloc(sizeof(io_poll_t));
86  if (!ptr)
87  set_errc(errno2c(errno));
88  return ptr;
89 }
90 
91 void
92 io_poll_free(void *ptr)
93 {
94  free(ptr);
95 }
96 
97 io_poll_t *
98 io_poll_init(io_poll_t *poll, io_ctx_t *ctx)
99 {
100  assert(poll);
101  assert(ctx);
102 
103  DWORD dwErrCode = GetLastError();
104 
105  poll->svc = (struct io_svc)IO_SVC_INIT(&io_poll_svc_vtbl);
106  poll->ctx = ctx;
107 
108  poll->poll_vptr = &io_poll_poll_vtbl;
109 
110  poll->CompletionPort = CreateIoCompletionPort(
111  INVALID_HANDLE_VALUE, NULL, 0, 0);
112  if (!poll->CompletionPort) {
113  dwErrCode = GetLastError();
114  goto error_CreateIoCompletionPort;
115  }
116 
117  io_ctx_insert(poll->ctx, &poll->svc);
118 
119  return poll;
120 
121  // CloseHandle(poll->CompletionPort);
122 error_CreateIoCompletionPort:
123  SetLastError(dwErrCode);
124  return NULL;
125 }
126 
127 void
128 io_poll_fini(io_poll_t *poll)
129 {
130  assert(poll);
131 
132  io_ctx_remove(poll->ctx, &poll->svc);
133 
134  CloseHandle(poll->CompletionPort);
135 }
136 
137 io_poll_t *
139 {
140  DWORD dwErrCode = 0;
141 
142  io_poll_t *poll = io_poll_alloc();
143  if (!poll) {
144  dwErrCode = GetLastError();
145  goto error_alloc;
146  }
147 
148  io_poll_t *tmp = io_poll_init(poll, ctx);
149  if (!tmp) {
150  dwErrCode = GetLastError();
151  goto error_init;
152  }
153  poll = tmp;
154 
155  return poll;
156 
157 error_init:
158  io_poll_free(poll);
159 error_alloc:
160  SetLastError(dwErrCode);
161  return NULL;
162 }
163 
164 void
166 {
167  if (poll) {
168  io_poll_fini(poll);
169  io_poll_free(poll);
170  }
171 }
172 
173 io_ctx_t *
174 io_poll_get_ctx(const io_poll_t *poll)
175 {
176  assert(poll);
177 
178  return poll->ctx;
179 }
180 
181 ev_poll_t *
182 io_poll_get_poll(const io_poll_t *poll)
183 {
184  assert(poll);
185 
186  return &poll->poll_vptr;
187 }
188 
189 int
190 io_poll_register_handle(io_poll_t *poll, HANDLE handle)
191 {
192  assert(poll);
193 
194  // clang-format off
195  return CreateIoCompletionPort(handle, poll->CompletionPort, 0, 0)
196  ? 0 : -1;
197  // clang-format on
198 }
199 
200 int
201 io_poll_post(io_poll_t *poll, size_t nbytes, struct io_cp *cp)
202 {
203  assert(poll);
204  assert(cp);
205 
206  // clang-format off
207  return PostQueuedCompletionStatus(poll->CompletionPort, nbytes, 0,
208  &cp->overlapped) ? 0 : -1;
209  // clang-format on
210 }
211 
212 #if !LELY_NO_THREADS
213 static void CALLBACK
214 io_poll_kill_func(ULONG_PTR Parameter)
215 {
216  *(BOOL *)(PVOID)Parameter = TRUE;
217 }
218 #endif
219 
220 static void *
221 io_poll_poll_self(const ev_poll_t *poll)
222 {
223  (void)poll;
224 
225 #if LELY_NO_THREADS
226  static struct io_poll_thrd thr = { 0 };
227 #else
228  static _Thread_local struct io_poll_thrd thr = { 0, NULL };
229  if (!thr.lpdwThreadId) {
230  static _Thread_local DWORD dwThreadId;
231  dwThreadId = GetCurrentThreadId();
232  thr.lpdwThreadId = &dwThreadId;
233  }
234 #endif
235  return &thr;
236 }
237 
238 static int
239 io_poll_poll_wait(ev_poll_t *poll_, int timeout)
240 {
241  io_poll_t *poll = io_poll_from_poll(poll_);
242  void *thr_ = io_poll_poll_self(poll_);
243  struct io_poll_thrd *thr = (struct io_poll_thrd *)thr_;
244 
245  DWORD dwMilliseconds = timeout < 0 ? INFINITE : (DWORD)timeout;
246 
247  int n = 0;
248  DWORD dwErrCode = GetLastError();
249 
250  OVERLAPPED_ENTRY CompletionPortEntries[LELY_IO_IOCP_COUNT];
251  ULONG ulNumEntriesRemoved = 0;
252 
253  do {
254  if (thr->stopped)
255  dwMilliseconds = 0;
256  BOOL fSuccess = GetQueuedCompletionStatusEx(
257  poll->CompletionPort, CompletionPortEntries,
258  LELY_IO_IOCP_COUNT, &ulNumEntriesRemoved,
259  dwMilliseconds, TRUE);
260  // TODO: Check if we can detect an APC from the return code.
261  if (thr->stopped)
262  break;
263 
264  if (!fSuccess) {
265  if (GetLastError() == WAIT_TIMEOUT) {
266  if (dwMilliseconds && timeout < 0)
267  continue;
268  } else if (!n) {
269  dwErrCode = GetLastError();
270  n = -1;
271  }
272  break;
273  }
274 
275  for (ULONG i = 0; i < ulNumEntriesRemoved; i++) {
276  LPOVERLAPPED_ENTRY lpEntry = &CompletionPortEntries[i];
277  LPOVERLAPPED lpOverlapped = lpEntry->lpOverlapped;
278  if (!lpOverlapped)
279  continue;
280  struct io_cp *cp = structof(
281  lpOverlapped, struct io_cp, overlapped);
282  if (cp->func) {
283  assert(lpfnRtlNtStatusToDosError);
284  DWORD dwErrorCode = lpfnRtlNtStatusToDosError(
285  lpOverlapped->Internal);
286  cp->func(cp, lpEntry->dwNumberOfBytesTransferred,
287  dwErrorCode);
288  }
289  n += n < INT_MAX;
290  }
291 
292  dwMilliseconds = 0;
293  } while (ulNumEntriesRemoved == LELY_IO_IOCP_COUNT);
294  thr->stopped = FALSE;
295 
296  SetLastError(dwErrCode);
297  return n;
298 }
299 
300 static int
301 io_poll_poll_kill(ev_poll_t *poll, void *thr_)
302 {
303  (void)poll;
304 #if LELY_NO_THREADS
305  (void)thr_;
306 #else
307  struct io_poll_thrd *thr = thr_;
308  assert(thr);
309 
310  HANDLE hThread = OpenThread(
311  THREAD_SET_CONTEXT, FALSE, *thr->lpdwThreadId);
312  if (!hThread)
313  return -1;
314 
315  if (!QueueUserAPC(&io_poll_kill_func, hThread,
316  (ULONG_PTR)(PVOID)&thr->stopped)) {
317  DWORD dwErrCode = GetLastError();
318  CloseHandle(hThread);
319  SetLastError(dwErrCode);
320  return -1;
321  }
322  CloseHandle(hThread);
323 #endif
324  return 0;
325 }
326 
327 static inline io_poll_t *
328 io_poll_from_svc(const struct io_svc *svc)
329 {
330  assert(svc);
331 
332  return structof(svc, io_poll_t, svc);
333 }
334 
335 static inline io_poll_t *
336 io_poll_from_poll(const ev_poll_t *poll)
337 {
338  assert(poll);
339 
340  return structof(poll, io_poll_t, poll_vptr);
341 }
342 
343 #endif // _WIN32
An I/O polling interface.
Definition: poll.c:48
Definition: ctx.c:35
This header file is part of the I/O library; it contains the I/O polling declarations for Windows...
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:121
io_ctx_t * io_poll_get_ctx(const io_poll_t *poll)
Returns a pointer to the I/O context with which the I/O polling instance is registered.
Definition: poll.c:275
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:957
ev_poll_t * io_poll_get_poll(const io_poll_t *poll)
Returns a pointer to the ev_poll_t instance corresponding to the I/O polling instance.
Definition: poll.c:281
void io_poll_destroy(io_poll_t *poll)
Destroys an I/O polling interface.
Definition: poll.c:240
io_cp_func_t * func
A pointer to the function to be invoked when the I/O operation completes.
Definition: poll.h:59
This header file is part of the utilities library; it contains the native and platform-independent er...
An I/O completion packet.
Definition: poll.h:54
int io_poll_post(io_poll_t *poll, size_t nbytes, struct io_cp *cp)
Posts a completion packet to the I/O completion port of an I/O polling instance.
An I/O service.
Definition: ctx.h:49
const struct ev_poll_vtbl *const ev_poll_t
The abstract polling interface.
Definition: poll.h:32
int io_poll_register_handle(io_poll_t *poll, HANDLE handle)
Registers a file handle with (the I/O completion port of) an I/O polling instance.
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:43
This is the internal header file of the Windows-specific I/O declarations.
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:136
This header file is part of the event library; it contains the task declarations. ...
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
io_poll_t * io_poll_create(void)
Creates a new I/O polling interface.
Definition: poll.c:215
The virtual table of an I/O service.
Definition: ctx.h:67
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition: features.h:239
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib.h> and defines any missing functionality.
OVERLAPPED overlapped
The OVERLAPPED structure submitted to the asynchronous I/O operation.
Definition: poll.h:63
Definition: poll.c:88
This is the public header file of the utilities library.