Lely core libraries  2.3.4
strand.c
Go to the documentation of this file.
1 
24 #include "ev.h"
25 
26 #if !LELY_NO_MALLOC
27 
28 #if !LELY_NO_THREADS
29 #include <lely/libc/stdatomic.h>
30 #include <lely/libc/threads.h>
31 #endif
32 #include <lely/ev/exec.h>
33 #include <lely/ev/strand.h>
34 #include <lely/ev/task.h>
35 #include <lely/util/errnum.h>
36 #include <lely/util/util.h>
37 
38 #include <assert.h>
39 #include <stdint.h>
40 #include <stdlib.h>
41 
42 static void ev_strand_exec_on_task_init(ev_exec_t *exec);
43 static void ev_strand_exec_on_task_fini(ev_exec_t *exec);
44 static int ev_strand_exec_dispatch(ev_exec_t *exec, struct ev_task *task);
45 static void ev_strand_exec_defer(ev_exec_t *exec, struct ev_task *task);
46 static size_t ev_strand_exec_abort(ev_exec_t *exec, struct ev_task *task);
47 
48 // clang-format off
49 static const struct ev_exec_vtbl ev_strand_exec_vtbl = {
50  &ev_strand_exec_on_task_init,
51  &ev_strand_exec_on_task_fini,
52  &ev_strand_exec_dispatch,
53  &ev_strand_exec_defer,
54  &ev_strand_exec_defer,
55  &ev_strand_exec_abort,
56  NULL
57 };
58 // clang-format on
59 
60 struct ev_strand {
61  const struct ev_exec_vtbl *exec_vptr;
62  ev_exec_t *inner_exec;
63  struct ev_task task;
64 #if !LELY_NO_THREADS
65  mtx_t mtx;
66 #endif
67  int posted;
68  struct sllist queue;
69  const int *thr;
70 };
71 
72 static void ev_strand_func(struct ev_task *task);
73 
74 static inline struct ev_strand *ev_strand_from_exec(const ev_exec_t *exec);
75 
76 #if LELY_NO_THREADS
77 static const int ev_strand_thrd;
78 #else
79 static _Thread_local const int ev_strand_thrd;
80 #endif
81 
82 void *
83 ev_strand_alloc(void)
84 {
85  struct ev_strand *strand = malloc(sizeof(*strand));
86 #if !LELY_NO_ERRNO
87  if (!strand)
88  set_errc(errno2c(errno));
89 #endif
90  // cppcheck-suppress memleak symbolName=strand
91  return strand ? &strand->exec_vptr : NULL;
92 }
93 
94 void
95 ev_strand_free(void *ptr)
96 {
97  free(ptr);
98 }
99 
100 ev_exec_t *
101 ev_strand_init(ev_exec_t *exec, ev_exec_t *inner_exec)
102 {
103  struct ev_strand *strand = ev_strand_from_exec(exec);
104  assert(inner_exec);
105 
106  strand->exec_vptr = &ev_strand_exec_vtbl;
107 
108  strand->inner_exec = inner_exec;
109  strand->task = (struct ev_task)EV_TASK_INIT(
110  strand->inner_exec, &ev_strand_func);
111 
112 #if !LELY_NO_THREADS
113  if (mtx_init(&strand->mtx, mtx_plain) != thrd_success)
114  return NULL;
115 #endif
116 
117  strand->posted = 0;
118 
119  sllist_init(&strand->queue);
120 
121  strand->thr = NULL;
122 
123  return exec;
124 }
125 
126 void
127 ev_strand_fini(ev_exec_t *exec)
128 {
129  struct ev_strand *strand = ev_strand_from_exec(exec);
130 
131  ev_strand_exec_abort(exec, NULL);
132 
133 #if !LELY_NO_THREADS
134  mtx_lock(&strand->mtx);
135 #endif
136  // Abort ev_strand_func().
137  if (strand->posted && ev_exec_abort(strand->task.exec, &strand->task))
138  strand->posted = 0;
139 #if !LELY_NO_THREADS
140  // If necessary, busy-wait until ev_strand_func() completes.
141  while (strand->posted) {
142  mtx_unlock(&strand->mtx);
143  thrd_yield();
144  mtx_lock(&strand->mtx);
145  }
146  mtx_unlock(&strand->mtx);
147 
148  mtx_destroy(&strand->mtx);
149 #endif
150 }
151 
152 ev_exec_t *
154 {
155  int errc = 0;
156 
157  ev_exec_t *exec = ev_strand_alloc();
158  if (!exec) {
159  errc = get_errc();
160  goto error_alloc;
161  }
162 
163  ev_exec_t *tmp = ev_strand_init(exec, inner_exec);
164  if (!tmp) {
165  errc = get_errc();
166  goto error_init;
167  }
168  exec = tmp;
169 
170  return exec;
171 
172 error_alloc:
173  ev_strand_free((void *)exec);
174 error_init:
175  set_errc(errc);
176  return NULL;
177 }
178 
179 void
181 {
182  if (exec) {
183  ev_strand_fini(exec);
184  ev_strand_free((void *)exec);
185  }
186 }
187 
188 ev_exec_t *
190 {
191  struct ev_strand *strand = ev_strand_from_exec(exec);
192 
193  return strand->inner_exec;
194 }
195 
196 static void
197 ev_strand_exec_on_task_init(ev_exec_t *exec)
198 {
199  struct ev_strand *strand = ev_strand_from_exec(exec);
200 
201  ev_exec_on_task_init(strand->inner_exec);
202 }
203 
204 static void
205 ev_strand_exec_on_task_fini(ev_exec_t *exec)
206 {
207  struct ev_strand *strand = ev_strand_from_exec(exec);
208 
209  ev_exec_on_task_fini(strand->inner_exec);
210 }
211 
212 static int
213 ev_strand_exec_dispatch(ev_exec_t *exec, struct ev_task *task)
214 {
215  struct ev_strand *strand = ev_strand_from_exec(exec);
216  assert(task);
217  assert(!task->exec || task->exec == exec);
218 
219  if (!task->exec)
220  task->exec = exec;
221  ev_strand_exec_on_task_init(exec);
222 
223 #if !LELY_NO_THREADS
224  mtx_lock(&strand->mtx);
225 #endif
226  if (strand->thr == &ev_strand_thrd) {
227 #if !LELY_NO_THREADS
228  mtx_unlock(&strand->mtx);
229 #endif
230  if (task->func)
231  task->func(task);
232  ev_strand_exec_on_task_fini(exec);
233  return 1;
234  } else {
235  sllist_push_back(&strand->queue, &task->_node);
236  int post = !strand->posted;
237  strand->posted = 1;
238 #if !LELY_NO_THREADS
239  mtx_unlock(&strand->mtx);
240 #endif
241  if (post)
242  ev_exec_post(strand->task.exec, &strand->task);
243  return 0;
244  }
245 }
246 
247 static void
248 ev_strand_exec_defer(ev_exec_t *exec, struct ev_task *task)
249 {
250  struct ev_strand *strand = ev_strand_from_exec(exec);
251  assert(task);
252  assert(!task->exec || task->exec == exec);
253 
254  if (!task->exec)
255  task->exec = exec;
256  ev_strand_exec_on_task_init(exec);
257 
258 #if !LELY_NO_THREADS
259  mtx_lock(&strand->mtx);
260 #endif
261  sllist_push_back(&strand->queue, &task->_node);
262  int post = !strand->posted;
263  strand->posted = 1;
264 #if !LELY_NO_THREADS
265  mtx_unlock(&strand->mtx);
266 #endif
267  if (post)
268  ev_exec_post(strand->task.exec, &strand->task);
269 }
270 
271 static size_t
272 ev_strand_exec_abort(ev_exec_t *exec, struct ev_task *task)
273 {
274  struct ev_strand *strand = ev_strand_from_exec(exec);
275 
276  struct sllist queue;
277  sllist_init(&queue);
278 
279 #if !LELY_NO_THREADS
280  mtx_lock(&strand->mtx);
281 #endif
282  if (!task)
283  sllist_append(&queue, &strand->queue);
284  else if (sllist_remove(&strand->queue, &task->_node))
285  sllist_push_back(&queue, &task->_node);
286 #if !LELY_NO_THREADS
287  mtx_unlock(&strand->mtx);
288 #endif
289 
290  size_t n = 0;
291  while (sllist_pop_front(&queue)) {
292  ev_strand_exec_on_task_fini(exec);
293  n += n < SIZE_MAX;
294  }
295  return n;
296 }
297 
298 static void
299 ev_strand_func(struct ev_task *task)
300 {
301  assert(task);
302  struct ev_strand *strand = structof(task, struct ev_strand, task);
303  ev_exec_t *exec = &strand->exec_vptr;
304 
305 #if !LELY_NO_THREADS
306  mtx_lock(&strand->mtx);
307 #endif
308  task = ev_task_from_node(sllist_pop_front(&strand->queue));
309  if (task) {
310  assert(!strand->thr);
311  strand->thr = &ev_strand_thrd;
312 #if !LELY_NO_THREADS
313  mtx_unlock(&strand->mtx);
314 #endif
315  assert(task->exec == exec);
316  if (task->func)
317  task->func(task);
318  ev_strand_exec_on_task_fini(exec);
319 #if !LELY_NO_THREADS
320  mtx_lock(&strand->mtx);
321 #endif
322  assert(strand->thr == &ev_strand_thrd);
323  strand->thr = NULL;
324  }
325  assert(strand->posted == 1);
326  int post = strand->posted = !sllist_empty(&strand->queue);
327 #if !LELY_NO_THREADS
328  mtx_unlock(&strand->mtx);
329 #endif
330  if (post)
331  ev_exec_post(strand->task.exec, &strand->task);
332 }
333 
334 static inline struct ev_strand *
335 ev_strand_from_exec(const ev_exec_t *exec)
336 {
337  assert(exec);
338 
339  return structof(exec, struct ev_strand, exec_vptr);
340 }
341 
342 #endif // !LELY_NO_MALLOC
ev_task_from_node
struct ev_task * ev_task_from_node(struct slnode *node)
Converts a pointer to a node in a queue to the address of the task containing the node.
Definition: task.c:32
ev_exec_t
const struct ev_exec_vtbl *const ev_exec_t
An abstract task executor.
Definition: ev.h:29
task.h
mtx_destroy
void mtx_destroy(mtx_t *mtx)
Releases any resources used by the mutex at mtx.
Definition: threads-pthread.c:113
sllist_remove
struct slnode * sllist_remove(struct sllist *list, struct slnode *node)
Removes a node from a singly-linked list.
Definition: sllist.c:46
strand.h
ev_strand_destroy
void ev_strand_destroy(ev_exec_t *exec)
Destroys a strand executor.
Definition: strand.c:180
threads.h
util.h
mtx_lock
int mtx_lock(mtx_t *mtx)
Blocks until it locks the mutex at mtx.
Definition: threads-pthread.c:150
get_errc
int get_errc(void)
Returns the last (thread-specific) native error code set by a system call or library function.
Definition: errnum.c:932
errno2c
int errno2c(int errnum)
Transforms a standard C error number to a native error code.
Definition: errnum.c:46
exec.h
ev_task::func
ev_task_func_t * func
The function to be invoked when the task is run.
Definition: task.h:45
sllist_append
struct sllist * sllist_append(struct sllist *dst, struct sllist *src)
Appends the singly-linked list at src to the one at dst.
Definition: sllist.h:257
mtx_t
pthread_mutex_t mtx_t
A complete object type that holds an identifier for a mutex.
Definition: threads.h:102
thrd_success
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121
mtx_unlock
int mtx_unlock(mtx_t *mtx)
Unlocks the mutex at mtx.
Definition: threads-pthread.c:183
set_errc
void set_errc(int errc)
Sets the current (thread-specific) native error code to errc.
Definition: errnum.c:944
ev_exec_vtbl
Definition: exec.h:37
sllist_empty
int sllist_empty(const struct sllist *list)
Returns 1 if the singly-linked list is empty, and 0 if not.
Definition: sllist.h:202
errnum.h
ev_strand_create
ev_exec_t * ev_strand_create(ev_exec_t *inner_exec)
Creates a strand executor.
Definition: strand.c:153
ev_exec_on_task_fini
void ev_exec_on_task_fini(ev_exec_t *exec)
Undoes the effect of a previous call to ev_exec_on_task_init().
Definition: exec.h:112
ev_strand_get_inner_exec
ev_exec_t * ev_strand_get_inner_exec(const ev_exec_t *exec)
Returns a pointer to the inner executor of a strand.
Definition: strand.c:189
stdint.h
sllist_init
void sllist_init(struct sllist *list)
Initializes a singly-linked list.
Definition: sllist.h:194
sllist_pop_front
struct slnode * sllist_pop_front(struct sllist *list)
Pops a node from the front of a singly-linked list.
Definition: sllist.h:243
ev_exec_abort
size_t ev_exec_abort(ev_exec_t *exec, struct ev_task *task)
Aborts the specified task submitted to *exec, if it has not yet begun executing, or all pending tasks...
Definition: exec.h:136
ev_strand
Definition: strand.c:60
thrd_yield
void thrd_yield(void)
Endeavors to permit other threads to run, even if the current thread would ordinarily continue to run...
Definition: threads-pthread.c:277
sllist
A singly-linked list.
Definition: sllist.h:52
stdatomic.h
structof
#define structof(ptr, type, member)
Obtains the address of a structure from the address of one of its members.
Definition: util.h:93
ev_task
An executable task.
Definition: task.h:41
ev.h
EV_TASK_INIT
#define EV_TASK_INIT(exec, func)
The static initializer for ev_task.
Definition: task.h:53
_Thread_local
#define _Thread_local
An object whose identifier is declared with the storage-class specifier _Thread_local has thread stor...
Definition: features.h:249
ev_exec_on_task_init
void ev_exec_on_task_init(ev_exec_t *exec)
Indicates to the specified executor that a task will be submitted for execution in the future.
Definition: exec.h:106
ev_exec_post
void ev_exec_post(ev_exec_t *exec, struct ev_task *task)
Submits *task to *exec for execution.
Definition: exec.h:124
stdlib.h
sllist_push_back
void sllist_push_back(struct sllist *list, struct slnode *node)
Pushes a node to the back of a singly-linked list.
Definition: sllist.h:232
mtx_init
int mtx_init(mtx_t *mtx, int type)
Creates a mutex object with properties indicated by type, which must have one of the four values:
Definition: threads-pthread.c:119
ev_task::exec
ev_exec_t * exec
A pointer to the executor to which the task is (to be) submitted.
Definition: task.h:43
mtx_plain
@ mtx_plain
A mutex type that supports neither timeout nor test and return.
Definition: threads.h:109