Lely core libraries 2.3.4
poll.c
Go to the documentation of this file.
1
24#include "io.h"
25
26#if !LELY_NO_STDIO && _WIN32
27
28#include <lely/ev/task.h>
29#include <lely/io2/win32/poll.h>
30#include <lely/util/errnum.h>
31#include <lely/util/util.h>
32
33#include <assert.h>
34#include <limits.h>
35#include <stdlib.h>
36
37#ifndef LELY_IO_IOCP_COUNT
38#define LELY_IO_IOCP_COUNT \
39 MAX((LELY_VLA_SIZE_MAX / sizeof(OVERLAPPED_ENTRY)), 1)
40#endif
41
42struct io_poll_thrd {
43 BOOL stopped;
44#if !LELY_NO_THREADS
45 LPDWORD lpdwThreadId;
46#endif
47};
48
49#if !LELY_NO_THREADS
50static void CALLBACK io_poll_kill_func(ULONG_PTR Parameter);
51#endif
52
53// clang-format off
54static const struct io_svc_vtbl io_poll_svc_vtbl = {
55 NULL,
56 NULL
57};
58// clang-format on
59
60static void *io_poll_poll_self(const ev_poll_t *poll);
61static int io_poll_poll_wait(ev_poll_t *poll, int timeout);
62static int io_poll_poll_kill(ev_poll_t *poll, void *thr);
63
64// clang-format off
65static const struct ev_poll_vtbl io_poll_poll_vtbl = {
66 &io_poll_poll_self,
67 &io_poll_poll_wait,
68 &io_poll_poll_kill
69};
70// clang-format on
71
72struct io_poll {
73 struct io_svc svc;
74 const struct ev_poll_vtbl *poll_vptr;
75 io_ctx_t *ctx;
76 HANDLE CompletionPort;
77};
78
79static inline io_poll_t *io_poll_from_svc(const struct io_svc *svc);
80static inline io_poll_t *io_poll_from_poll(const ev_poll_t *poll);
81
82void *
83io_poll_alloc(void)
84{
85 void *ptr = malloc(sizeof(io_poll_t));
86 if (!ptr)
87 set_errc(errno2c(errno));
88 return ptr;
89}
90
91void
92io_poll_free(void *ptr)
93{
94 free(ptr);
95}
96
98io_poll_init(io_poll_t *poll, io_ctx_t *ctx)
99{
100 assert(poll);
101 assert(ctx);
102
103 DWORD dwErrCode = GetLastError();
104
105 poll->svc = (struct io_svc)IO_SVC_INIT(&io_poll_svc_vtbl);
106 poll->ctx = ctx;
107
108 poll->poll_vptr = &io_poll_poll_vtbl;
109
110 poll->CompletionPort = CreateIoCompletionPort(
111 INVALID_HANDLE_VALUE, NULL, 0, 0);
112 if (!poll->CompletionPort) {
113 dwErrCode = GetLastError();
114 goto error_CreateIoCompletionPort;
115 }
116
117 io_ctx_insert(poll->ctx, &poll->svc);
118
119 return poll;
120
121 // CloseHandle(poll->CompletionPort);
122error_CreateIoCompletionPort:
123 SetLastError(dwErrCode);
124 return NULL;
125}
126
127void
128io_poll_fini(io_poll_t *poll)
129{
130 assert(poll);
131
132 io_ctx_remove(poll->ctx, &poll->svc);
133
134 CloseHandle(poll->CompletionPort);
135}
136
137io_poll_t *
139{
140 DWORD dwErrCode = 0;
141
142 io_poll_t *poll = io_poll_alloc();
143 if (!poll) {
144 dwErrCode = GetLastError();
145 goto error_alloc;
146 }
147
148 io_poll_t *tmp = io_poll_init(poll, ctx);
149 if (!tmp) {
150 dwErrCode = GetLastError();
151 goto error_init;
152 }
153 poll = tmp;
154
155 return poll;
156
157error_init:
158 io_poll_free(poll);
159error_alloc:
160 SetLastError(dwErrCode);
161 return NULL;
162}
163
164void
166{
167 if (poll) {
168 io_poll_fini(poll);
169 io_poll_free(poll);
170 }
171}
172
173io_ctx_t *
174io_poll_get_ctx(const io_poll_t *poll)
175{
176 assert(poll);
177
178 return poll->ctx;
179}
180
181ev_poll_t *
182io_poll_get_poll(const io_poll_t *poll)
183{
184 assert(poll);
185
186 return &poll->poll_vptr;
187}
188
189int
190io_poll_register_handle(io_poll_t *poll, HANDLE handle)
191{
192 assert(poll);
193
194 // clang-format off
195 return CreateIoCompletionPort(handle, poll->CompletionPort, 0, 0)
196 ? 0 : -1;
197 // clang-format on
198}
199
200int
201io_poll_post(io_poll_t *poll, size_t nbytes, struct io_cp *cp)
202{
203 assert(poll);
204 assert(cp);
205
206 // clang-format off
207 return PostQueuedCompletionStatus(poll->CompletionPort, nbytes, 0,
208 &cp->overlapped) ? 0 : -1;
209 // clang-format on
210}
211
212#if !LELY_NO_THREADS
213static void CALLBACK
214io_poll_kill_func(ULONG_PTR Parameter)
215{
216 *(BOOL *)(PVOID)Parameter = TRUE;
217}
218#endif
219
220static void *
221io_poll_poll_self(const ev_poll_t *poll)
222{
223 (void)poll;
224
225#if LELY_NO_THREADS
226 static struct io_poll_thrd thr = { 0 };
227#else
228 static _Thread_local struct io_poll_thrd thr = { 0, NULL };
229 if (!thr.lpdwThreadId) {
230 static _Thread_local DWORD dwThreadId;
231 dwThreadId = GetCurrentThreadId();
232 thr.lpdwThreadId = &dwThreadId;
233 }
234#endif
235 return &thr;
236}
237
238static int
239io_poll_poll_wait(ev_poll_t *poll_, int timeout)
240{
241 io_poll_t *poll = io_poll_from_poll(poll_);
242 void *thr_ = io_poll_poll_self(poll_);
243 struct io_poll_thrd *thr = (struct io_poll_thrd *)thr_;
244
245 DWORD dwMilliseconds = timeout < 0 ? INFINITE : (DWORD)timeout;
246
247 int n = 0;
248 DWORD dwErrCode = GetLastError();
249
250 OVERLAPPED_ENTRY CompletionPortEntries[LELY_IO_IOCP_COUNT];
251 ULONG ulNumEntriesRemoved = 0;
252
253 do {
254 if (thr->stopped)
255 dwMilliseconds = 0;
256 BOOL fSuccess = GetQueuedCompletionStatusEx(
257 poll->CompletionPort, CompletionPortEntries,
258 LELY_IO_IOCP_COUNT, &ulNumEntriesRemoved,
259 dwMilliseconds, TRUE);
260 // TODO: Check if we can detect an APC from the return code.
261 if (thr->stopped)
262 break;
263
264 if (!fSuccess) {
265 if (GetLastError() == WAIT_TIMEOUT) {
266 if (dwMilliseconds && timeout < 0)
267 continue;
268 } else if (!n) {
269 dwErrCode = GetLastError();
270 n = -1;
271 }
272 break;
273 }
274
275 for (ULONG i = 0; i < ulNumEntriesRemoved; i++) {
276 LPOVERLAPPED_ENTRY lpEntry = &CompletionPortEntries[i];
277 LPOVERLAPPED lpOverlapped = lpEntry->lpOverlapped;
278 if (!lpOverlapped)
279 continue;
280 struct io_cp *cp = structof(
281 lpOverlapped, struct io_cp, overlapped);
282 if (cp->func) {
283 assert(lpfnRtlNtStatusToDosError);
284 DWORD dwErrorCode = lpfnRtlNtStatusToDosError(
285 lpOverlapped->Internal);
286 cp->func(cp, lpEntry->dwNumberOfBytesTransferred,
287 dwErrorCode);
288 }
289 n += n < INT_MAX;
290 }
291
292 dwMilliseconds = 0;
293 } while (ulNumEntriesRemoved == LELY_IO_IOCP_COUNT);
294 thr->stopped = FALSE;
295
296 SetLastError(dwErrCode);
297 return n;
298}
299
300static int
301io_poll_poll_kill(ev_poll_t *poll, void *thr_)
302{
303 (void)poll;
304#if LELY_NO_THREADS
305 (void)thr_;
306#else
307 struct io_poll_thrd *thr = thr_;
308 assert(thr);
309
310 HANDLE hThread = OpenThread(
311 THREAD_SET_CONTEXT, FALSE, *thr->lpdwThreadId);
312 if (!hThread)
313 return -1;
314
315 if (!QueueUserAPC(&io_poll_kill_func, hThread,
316 (ULONG_PTR)(PVOID)&thr->stopped)) {
317 DWORD dwErrCode = GetLastError();
318 CloseHandle(hThread);
319 SetLastError(dwErrCode);
320 return -1;
321 }
322 CloseHandle(hThread);
323#endif
324 return 0;
325}
326
327static inline io_poll_t *
328io_poll_from_svc(const struct io_svc *svc)
329{
330 assert(svc);
331
332 return structof(svc, io_poll_t, svc);
333}
334
335static inline io_poll_t *
336io_poll_from_poll(const ev_poll_t *poll)
337{
338 assert(poll);
339
340 return structof(poll, io_poll_t, poll_vptr);
341}
342
343#endif // !LELY_NO_STDIO && _WIN32
void io_ctx_insert(io_ctx_t *ctx, struct io_svc *svc)
Registers an I/O service with an I/O context.
Definition: ctx.c:126
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
void io_ctx_remove(io_ctx_t *ctx, struct io_svc *svc)
Unregisters an I/O service with an I/O context.
Definition: ctx.c:141
This header file is part of the utilities library; it contains the native and platform-independent er...
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:944
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:46
const struct ev_poll_vtbl *const ev_poll_t
The abstract polling interface.
Definition: poll.h:32
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition: features.h:249
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
ev_poll_t * io_poll_get_poll(const io_poll_t *poll)
Returns a pointer to the ev_poll_t instance corresponding to the I/O polling instance.
Definition: poll.c:281
io_ctx_t * io_poll_get_ctx(const io_poll_t *poll)
Returns a pointer to the I/O context with which the I/O polling instance is registered.
Definition: poll.c:275
This header file is part of the I/O library; it contains the I/O polling declarations for Windows.
int io_poll_register_handle(io_poll_t *poll, HANDLE handle)
Registers a file handle with (the I/O completion port of) an I/O polling instance.
int io_poll_post(io_poll_t *poll, size_t nbytes, struct io_cp *cp)
Posts a completion packet to the I/O completion port of an I/O polling instance.
void io_poll_destroy(io_poll_t *poll)
Destroys an I/O polling interface.
Definition: poll.c:243
io_poll_t * io_poll_create(void)
Creates a new I/O polling interface.
Definition: poll.c:218
This is the internal header file of the Windows-specific I/O declarations.
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
An I/O polling interface.
Definition: poll.c:51
An I/O completion packet.
Definition: poll.h:54
io_cp_func_t * func
A pointer to the function to be invoked when the I/O operation completes.
Definition: poll.h:59
OVERLAPPED overlapped
The OVERLAPPED structure submitted to the asynchronous I/O operation.
Definition: poll.h:63
Definition: ctx.c:38
Definition: poll.c:88
The virtual table of an I/O service.
Definition: ctx.h:67
An I/O service.
Definition: ctx.h:49
This header file is part of the event library; it contains the task declarations.