Lely core libraries 2.3.4
strand.c
Go to the documentation of this file.
1
24#include "ev.h"
25
26#if !LELY_NO_MALLOC
27
28#if !LELY_NO_THREADS
29#include <lely/libc/stdatomic.h>
30#include <lely/libc/threads.h>
31#endif
32#include <lely/ev/exec.h>
33#include <lely/ev/strand.h>
34#include <lely/ev/task.h>
35#include <lely/util/errnum.h>
36#include <lely/util/util.h>
37
38#include <assert.h>
39#include <stdint.h>
40#include <stdlib.h>
41
42static void ev_strand_exec_on_task_init(ev_exec_t *exec);
43static void ev_strand_exec_on_task_fini(ev_exec_t *exec);
44static int ev_strand_exec_dispatch(ev_exec_t *exec, struct ev_task *task);
45static void ev_strand_exec_defer(ev_exec_t *exec, struct ev_task *task);
46static size_t ev_strand_exec_abort(ev_exec_t *exec, struct ev_task *task);
47
48// clang-format off
49static const struct ev_exec_vtbl ev_strand_exec_vtbl = {
50 &ev_strand_exec_on_task_init,
51 &ev_strand_exec_on_task_fini,
52 &ev_strand_exec_dispatch,
53 &ev_strand_exec_defer,
54 &ev_strand_exec_defer,
55 &ev_strand_exec_abort,
56 NULL
57};
58// clang-format on
59
60struct ev_strand {
61 const struct ev_exec_vtbl *exec_vptr;
62 ev_exec_t *inner_exec;
63 struct ev_task task;
64#if !LELY_NO_THREADS
65 mtx_t mtx;
66#endif
67 int posted;
68 struct sllist queue;
69 const int *thr;
70};
71
72static void ev_strand_func(struct ev_task *task);
73
74static inline struct ev_strand *ev_strand_from_exec(const ev_exec_t *exec);
75
76#if LELY_NO_THREADS
77static const int ev_strand_thrd;
78#else
79static _Thread_local const int ev_strand_thrd;
80#endif
81
82void *
83ev_strand_alloc(void)
84{
85 struct ev_strand *strand = malloc(sizeof(*strand));
86#if !LELY_NO_ERRNO
87 if (!strand)
88 set_errc(errno2c(errno));
89#endif
90 // cppcheck-suppress memleak symbolName=strand
91 return strand ? &strand->exec_vptr : NULL;
92}
93
94void
95ev_strand_free(void *ptr)
96{
97 free(ptr);
98}
99
100ev_exec_t *
101ev_strand_init(ev_exec_t *exec, ev_exec_t *inner_exec)
102{
103 struct ev_strand *strand = ev_strand_from_exec(exec);
104 assert(inner_exec);
105
106 strand->exec_vptr = &ev_strand_exec_vtbl;
107
108 strand->inner_exec = inner_exec;
109 strand->task = (struct ev_task)EV_TASK_INIT(
110 strand->inner_exec, &ev_strand_func);
111
112#if !LELY_NO_THREADS
113 if (mtx_init(&strand->mtx, mtx_plain) != thrd_success)
114 return NULL;
115#endif
116
117 strand->posted = 0;
118
119 sllist_init(&strand->queue);
120
121 strand->thr = NULL;
122
123 return exec;
124}
125
126void
127ev_strand_fini(ev_exec_t *exec)
128{
129 struct ev_strand *strand = ev_strand_from_exec(exec);
130
131 ev_strand_exec_abort(exec, NULL);
132
133#if !LELY_NO_THREADS
134 mtx_lock(&strand->mtx);
135#endif
136 // Abort ev_strand_func().
137 if (strand->posted && ev_exec_abort(strand->task.exec, &strand->task))
138 strand->posted = 0;
139#if !LELY_NO_THREADS
140 // If necessary, busy-wait until ev_strand_func() completes.
141 while (strand->posted) {
142 mtx_unlock(&strand->mtx);
143 thrd_yield();
144 mtx_lock(&strand->mtx);
145 }
146 mtx_unlock(&strand->mtx);
147
148 mtx_destroy(&strand->mtx);
149#endif
150}
151
152ev_exec_t *
154{
155 int errc = 0;
156
157 ev_exec_t *exec = ev_strand_alloc();
158 if (!exec) {
159 errc = get_errc();
160 goto error_alloc;
161 }
162
163 ev_exec_t *tmp = ev_strand_init(exec, inner_exec);
164 if (!tmp) {
165 errc = get_errc();
166 goto error_init;
167 }
168 exec = tmp;
169
170 return exec;
171
172error_alloc:
173 ev_strand_free((void *)exec);
174error_init:
175 set_errc(errc);
176 return NULL;
177}
178
179void
181{
182 if (exec) {
183 ev_strand_fini(exec);
184 ev_strand_free((void *)exec);
185 }
186}
187
188ev_exec_t *
190{
191 struct ev_strand *strand = ev_strand_from_exec(exec);
192
193 return strand->inner_exec;
194}
195
196static void
197ev_strand_exec_on_task_init(ev_exec_t *exec)
198{
199 struct ev_strand *strand = ev_strand_from_exec(exec);
200
201 ev_exec_on_task_init(strand->inner_exec);
202}
203
204static void
205ev_strand_exec_on_task_fini(ev_exec_t *exec)
206{
207 struct ev_strand *strand = ev_strand_from_exec(exec);
208
209 ev_exec_on_task_fini(strand->inner_exec);
210}
211
212static int
213ev_strand_exec_dispatch(ev_exec_t *exec, struct ev_task *task)
214{
215 struct ev_strand *strand = ev_strand_from_exec(exec);
216 assert(task);
217 assert(!task->exec || task->exec == exec);
218
219 if (!task->exec)
220 task->exec = exec;
221 ev_strand_exec_on_task_init(exec);
222
223#if !LELY_NO_THREADS
224 mtx_lock(&strand->mtx);
225#endif
226 if (strand->thr == &ev_strand_thrd) {
227#if !LELY_NO_THREADS
228 mtx_unlock(&strand->mtx);
229#endif
230 if (task->func)
231 task->func(task);
232 ev_strand_exec_on_task_fini(exec);
233 return 1;
234 } else {
235 sllist_push_back(&strand->queue, &task->_node);
236 int post = !strand->posted;
237 strand->posted = 1;
238#if !LELY_NO_THREADS
239 mtx_unlock(&strand->mtx);
240#endif
241 if (post)
242 ev_exec_post(strand->task.exec, &strand->task);
243 return 0;
244 }
245}
246
247static void
248ev_strand_exec_defer(ev_exec_t *exec, struct ev_task *task)
249{
250 struct ev_strand *strand = ev_strand_from_exec(exec);
251 assert(task);
252 assert(!task->exec || task->exec == exec);
253
254 if (!task->exec)
255 task->exec = exec;
256 ev_strand_exec_on_task_init(exec);
257
258#if !LELY_NO_THREADS
259 mtx_lock(&strand->mtx);
260#endif
261 sllist_push_back(&strand->queue, &task->_node);
262 int post = !strand->posted;
263 strand->posted = 1;
264#if !LELY_NO_THREADS
265 mtx_unlock(&strand->mtx);
266#endif
267 if (post)
268 ev_exec_post(strand->task.exec, &strand->task);
269}
270
271static size_t
272ev_strand_exec_abort(ev_exec_t *exec, struct ev_task *task)
273{
274 struct ev_strand *strand = ev_strand_from_exec(exec);
275
276 struct sllist queue;
277 sllist_init(&queue);
278
279#if !LELY_NO_THREADS
280 mtx_lock(&strand->mtx);
281#endif
282 if (!task)
283 sllist_append(&queue, &strand->queue);
284 else if (sllist_remove(&strand->queue, &task->_node))
285 sllist_push_back(&queue, &task->_node);
286#if !LELY_NO_THREADS
287 mtx_unlock(&strand->mtx);
288#endif
289
290 size_t n = 0;
291 while (sllist_pop_front(&queue)) {
292 ev_strand_exec_on_task_fini(exec);
293 n += n < SIZE_MAX;
294 }
295 return n;
296}
297
298static void
299ev_strand_func(struct ev_task *task)
300{
301 assert(task);
302 struct ev_strand *strand = structof(task, struct ev_strand, task);
303 ev_exec_t *exec = &strand->exec_vptr;
304
305#if !LELY_NO_THREADS
306 mtx_lock(&strand->mtx);
307#endif
308 task = ev_task_from_node(sllist_pop_front(&strand->queue));
309 if (task) {
310 assert(!strand->thr);
311 strand->thr = &ev_strand_thrd;
312#if !LELY_NO_THREADS
313 mtx_unlock(&strand->mtx);
314#endif
315 assert(task->exec == exec);
316 if (task->func)
317 task->func(task);
318 ev_strand_exec_on_task_fini(exec);
319#if !LELY_NO_THREADS
320 mtx_lock(&strand->mtx);
321#endif
322 assert(strand->thr == &ev_strand_thrd);
323 strand->thr = NULL;
324 }
325 assert(strand->posted == 1);
326 int post = strand->posted = !sllist_empty(&strand->queue);
327#if !LELY_NO_THREADS
328 mtx_unlock(&strand->mtx);
329#endif
330 if (post)
331 ev_exec_post(strand->task.exec, &strand->task);
332}
333
334static inline struct ev_strand *
335ev_strand_from_exec(const ev_exec_t *exec)
336{
337 assert(exec);
338
339 return structof(exec, struct ev_strand, exec_vptr);
340}
341
342#endif // !LELY_NO_MALLOC
This header file is part of the utilities library; it contains the native and platform-independent er...
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition errnum.c:932
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition errnum.c:944
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition errnum.c:46
This header file is part of the event library; it contains the abstract task executor interface.
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition exec.h:136
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition exec.h:124
void ev_exec_on_task_fini(ev_exec_t *exec)
Undoes the effect of a previous call to ev_exec_on_task_init().
Definition exec.h:112
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition exec.h:106
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition features.h:249
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition ev.h:29
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition util.h:93
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition sllist.h:194
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition sllist.h:257
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition sllist.h:232
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition sllist.h:202
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition sllist.h:243
This is the internal header file of the event library.
This header file is part of the C11 and POSIX compatibility library; it includes <stdatomic....
This header file is part of the C11 and POSIX compatibility library; it includes <stdint....
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
void ev_strand_destroy(ev_exec_t *exec)
Destroys a strand executor.
Definition strand.c:180
ev_exec_t * ev_strand_get_inner_exec(const ev_exec_t *exec)
Returns a pointer to the inner executor of a strand.
Definition strand.c:189
ev_exec_t * ev_strand_create(ev_exec_t *inner_exec)
Creates a strand executor.
Definition strand.c:153
This header file is part of the event library; it contains the strand executor declarations.
An executable task.
Definition task.h:41
ev_task_func_t * func
The function to be invoked when the task is run.
Definition task.h:45
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition task.h:43
A singly-linked list.
Definition sllist.h:52
This header file is part of the event library; it contains the task declarations.
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition task.h:53
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
void thrd_yield(void)
Endeavors to permit other threads to run, even if the current thread would ordinarily continue to run...
@ thrd_success
Indicates that the requested operation succeeded.
Definition threads.h:121
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition threads.h:109