Lely core libraries  2.2.5
strand.c
Go to the documentation of this file.
1 
24 #include "ev.h"
25 #if !LELY_NO_THREADS
26 #include <lely/libc/stdatomic.h>
27 #include <lely/libc/threads.h>
28 #endif
29 #include <lely/ev/exec.h>
30 #include <lely/ev/strand.h>
31 #include <lely/ev/task.h>
32 #include <lely/util/errnum.h>
33 #include <lely/util/util.h>
34 
35 #include <assert.h>
36 #include <stdint.h>
37 #include <stdlib.h>
38 
39 static void ev_strand_exec_on_task_init(ev_exec_t *exec);
40 static void ev_strand_exec_on_task_fini(ev_exec_t *exec);
41 static int ev_strand_exec_dispatch(ev_exec_t *exec, struct ev_task *task);
42 static void ev_strand_exec_defer(ev_exec_t *exec, struct ev_task *task);
43 static size_t ev_strand_exec_abort(ev_exec_t *exec, struct ev_task *task);
44 
45 // clang-format off
46 static const struct ev_exec_vtbl ev_strand_exec_vtbl = {
47  &ev_strand_exec_on_task_init,
48  &ev_strand_exec_on_task_fini,
49  &ev_strand_exec_dispatch,
50  &ev_strand_exec_defer,
51  &ev_strand_exec_defer,
52  &ev_strand_exec_abort,
53  NULL
54 };
55 // clang-format on
56 
57 struct ev_strand {
58  const struct ev_exec_vtbl *exec_vptr;
59  ev_exec_t *inner_exec;
60  struct ev_task task;
61 #if !LELY_NO_THREADS
62  mtx_t mtx;
63 #endif
64  int posted;
65  struct sllist queue;
66  const int *thr;
67 };
68 
69 static void ev_strand_func(struct ev_task *task);
70 
71 static inline struct ev_strand *ev_strand_from_exec(const ev_exec_t *exec);
72 
73 #if LELY_NO_THREADS
74 static const int ev_strand_thrd;
75 #else
76 static _Thread_local const int ev_strand_thrd;
77 #endif
78 
79 void *
80 ev_strand_alloc(void)
81 {
82  struct ev_strand *strand = malloc(sizeof(*strand));
83  if (!strand)
84  set_errc(errno2c(errno));
85  // cppcheck-suppress memleak symbolName=strand
86  return strand ? &strand->exec_vptr : NULL;
87 }
88 
89 void
90 ev_strand_free(void *ptr)
91 {
92  free(ptr);
93 }
94 
95 ev_exec_t *
96 ev_strand_init(ev_exec_t *exec, ev_exec_t *inner_exec)
97 {
98  struct ev_strand *strand = ev_strand_from_exec(exec);
99  assert(inner_exec);
100 
101  strand->exec_vptr = &ev_strand_exec_vtbl;
102 
103  strand->inner_exec = inner_exec;
104  strand->task = (struct ev_task)EV_TASK_INIT(
105  strand->inner_exec, &ev_strand_func);
106 
107 #if !LELY_NO_THREADS
108  if (mtx_init(&strand->mtx, mtx_plain) != thrd_success)
109  return NULL;
110 #endif
111 
112  strand->posted = 0;
113 
114  sllist_init(&strand->queue);
115 
116  strand->thr = NULL;
117 
118  return exec;
119 }
120 
121 void
122 ev_strand_fini(ev_exec_t *exec)
123 {
124  struct ev_strand *strand = ev_strand_from_exec(exec);
125 
126  ev_strand_exec_abort(exec, NULL);
127 
128 #if !LELY_NO_THREADS
129  mtx_lock(&strand->mtx);
130 #endif
131  // Abort ev_strand_func().
132  if (strand->posted && ev_exec_abort(strand->task.exec, &strand->task))
133  strand->posted = 0;
134 #if !LELY_NO_THREADS
135  // If necessary, busy-wait until ev_strand_func() completes.
136  while (strand->posted) {
137  mtx_unlock(&strand->mtx);
138  thrd_yield();
139  mtx_lock(&strand->mtx);
140  }
141  mtx_unlock(&strand->mtx);
142 
143  mtx_destroy(&strand->mtx);
144 #endif
145 }
146 
147 ev_exec_t *
149 {
150  int errc = 0;
151 
152  ev_exec_t *exec = ev_strand_alloc();
153  if (!exec) {
154  errc = get_errc();
155  goto error_alloc;
156  }
157 
158  ev_exec_t *tmp = ev_strand_init(exec, inner_exec);
159  if (!tmp) {
160  errc = get_errc();
161  goto error_init;
162  }
163  exec = tmp;
164 
165  return exec;
166 
167 error_alloc:
168  ev_strand_free((void *)exec);
169 error_init:
170  set_errc(errc);
171  return NULL;
172 }
173 
174 void
176 {
177  if (exec) {
178  ev_strand_fini(exec);
179  ev_strand_free((void *)exec);
180  }
181 }
182 
183 ev_exec_t *
185 {
186  struct ev_strand *strand = ev_strand_from_exec(exec);
187 
188  return strand->inner_exec;
189 }
190 
191 static void
192 ev_strand_exec_on_task_init(ev_exec_t *exec)
193 {
194  struct ev_strand *strand = ev_strand_from_exec(exec);
195 
196  ev_exec_on_task_init(strand->inner_exec);
197 }
198 
199 static void
200 ev_strand_exec_on_task_fini(ev_exec_t *exec)
201 {
202  struct ev_strand *strand = ev_strand_from_exec(exec);
203 
204  ev_exec_on_task_fini(strand->inner_exec);
205 }
206 
207 static int
208 ev_strand_exec_dispatch(ev_exec_t *exec, struct ev_task *task)
209 {
210  struct ev_strand *strand = ev_strand_from_exec(exec);
211  assert(task);
212  assert(!task->exec || task->exec == exec);
213 
214  if (!task->exec)
215  task->exec = exec;
216  ev_strand_exec_on_task_init(exec);
217 
218 #if !LELY_NO_THREADS
219  mtx_lock(&strand->mtx);
220 #endif
221  if (strand->thr == &ev_strand_thrd) {
222 #if !LELY_NO_THREADS
223  mtx_unlock(&strand->mtx);
224 #endif
225  if (task->func)
226  task->func(task);
227  ev_strand_exec_on_task_fini(exec);
228  return 1;
229  } else {
230  sllist_push_back(&strand->queue, &task->_node);
231  int post = !strand->posted;
232  strand->posted = 1;
233 #if !LELY_NO_THREADS
234  mtx_unlock(&strand->mtx);
235 #endif
236  if (post)
237  ev_exec_post(strand->task.exec, &strand->task);
238  return 0;
239  }
240 }
241 
242 static void
243 ev_strand_exec_defer(ev_exec_t *exec, struct ev_task *task)
244 {
245  struct ev_strand *strand = ev_strand_from_exec(exec);
246  assert(task);
247  assert(!task->exec || task->exec == exec);
248 
249  if (!task->exec)
250  task->exec = exec;
251  ev_strand_exec_on_task_init(exec);
252 
253 #if !LELY_NO_THREADS
254  mtx_lock(&strand->mtx);
255 #endif
256  sllist_push_back(&strand->queue, &task->_node);
257  int post = !strand->posted;
258  strand->posted = 1;
259 #if !LELY_NO_THREADS
260  mtx_unlock(&strand->mtx);
261 #endif
262  if (post)
263  ev_exec_post(strand->task.exec, &strand->task);
264 }
265 
266 static size_t
267 ev_strand_exec_abort(ev_exec_t *exec, struct ev_task *task)
268 {
269  struct ev_strand *strand = ev_strand_from_exec(exec);
270 
271  struct sllist queue;
272  sllist_init(&queue);
273 
274 #if !LELY_NO_THREADS
275  mtx_lock(&strand->mtx);
276 #endif
277  if (!task)
278  sllist_append(&queue, &strand->queue);
279  else if (sllist_remove(&strand->queue, &task->_node))
280  sllist_push_back(&queue, &task->_node);
281 #if !LELY_NO_THREADS
282  mtx_unlock(&strand->mtx);
283 #endif
284 
285  size_t n = 0;
286  while (sllist_pop_front(&queue)) {
287  ev_strand_exec_on_task_fini(exec);
288  n += n < SIZE_MAX;
289  }
290  return n;
291 }
292 
293 static void
294 ev_strand_func(struct ev_task *task)
295 {
296  assert(task);
297  struct ev_strand *strand = structof(task, struct ev_strand, task);
298  ev_exec_t *exec = &strand->exec_vptr;
299 
300 #if !LELY_NO_THREADS
301  mtx_lock(&strand->mtx);
302 #endif
303  task = ev_task_from_node(sllist_pop_front(&strand->queue));
304  if (task) {
305  assert(!strand->thr);
306  strand->thr = &ev_strand_thrd;
307 #if !LELY_NO_THREADS
308  mtx_unlock(&strand->mtx);
309 #endif
310  assert(task->exec == exec);
311  if (task->func)
312  task->func(task);
313  ev_strand_exec_on_task_fini(exec);
314 #if !LELY_NO_THREADS
315  mtx_lock(&strand->mtx);
316 #endif
317  assert(strand->thr == &ev_strand_thrd);
318  strand->thr = NULL;
319  }
320  assert(strand->posted == 1);
321  int post = strand->posted = !sllist_empty(&strand->queue);
322 #if !LELY_NO_THREADS
323  mtx_unlock(&strand->mtx);
324 #endif
325  if (post)
326  ev_exec_post(strand->task.exec, &strand->task);
327 }
328 
329 static inline struct ev_strand *
330 ev_strand_from_exec(const ev_exec_t *exec)
331 {
332  assert(exec);
333 
334  return structof(exec, struct ev_strand, exec_vptr);
335 }
This header file is part of the utilities library; it contains the native and platform-independent er...
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition: errnum.c:947
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:957
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:43
This header file is part of the event library; it contains the abstract task executor interface.
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:138
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:126
void ev_exec_on_task_fini(ev_exec_t *exec)
Undoes the effect of a previous call to ev_exec_on_task_init().
Definition: exec.h:114
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:108
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition: features.h:239
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
This is the public header file of the utilities library.
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:184
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:233
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:213
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:190
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:221
This is the internal header file of the event library.
This header file is part of the C11 and POSIX compatibility library; it includes <stdatomic....
This header file is part of the C11 and POSIX compatibility library; it includes <stdint....
This header file is part of the C11 and POSIX compatibility library; it includes <stdlib....
void ev_strand_destroy(ev_exec_t *exec)
Destroys a strand executor.
Definition: strand.c:175
ev_exec_t * ev_strand_create(ev_exec_t *inner_exec)
Creates a strand executor.
Definition: strand.c:148
ev_exec_t * ev_strand_get_inner_exec(const ev_exec_t *exec)
Returns a pointer to the inner executor of a strand.
Definition: strand.c:184
This header file is part of the event library; it contains the strand executor declarations.
An executable task.
Definition: task.h:41
ev_task_func_t * func
The function to be invoked when the task is run.
Definition: task.h:45
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
A singly-linked list.
Definition: sllist.h:51
This header file is part of the event library; it contains the task declarations.
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
void thrd_yield(void)
Endeavors to permit other threads to run, even if the current thread would ordinarily continue to run...
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109