Lely core libraries  2.3.4
spscring.c
Go to the documentation of this file.
1 
24 #include "util.h"
25 #include <lely/util/spscring.h>
26 
27 #include <assert.h>
28 #include <stdint.h>
29 
30 static void spscring_pc_signal(struct spscring *ring);
31 static void spscring_cp_signal(struct spscring *ring);
32 
33 static inline size_t spscring_atomic_load(
34  const volatile spscring_atomic_t *object);
35 static inline void spscring_atomic_store(
36  volatile spscring_atomic_t *object, size_t desired);
37 static inline _Bool spscring_atomic_compare_exchange_strong(
38  volatile spscring_atomic_t *object, size_t *expected,
39  size_t desired);
40 static inline _Bool spscring_atomic_compare_exchange_weak(
41  volatile spscring_atomic_t *object, size_t *expected,
42  size_t desired);
43 
44 static inline void spscring_yield(void);
45 
46 void
47 spscring_init(struct spscring *ring, size_t size)
48 {
49  assert(ring);
50  assert(size <= SIZE_MAX / 2 + 1);
51 
52  *ring = (struct spscring)SPSCRING_INIT(size);
53 }
54 
55 size_t
56 spscring_size(const struct spscring *ring)
57 {
58  assert(ring);
59  assert(ring->p.ctx.size == ring->c.ctx.size);
60 
61  return ring->p.ctx.size;
62 }
63 
64 size_t
66 {
67  assert(ring);
68  struct spscring_ctx *ctx = &ring->p.ctx;
69  assert(ctx->size <= SIZE_MAX / 2 + 1);
70  assert(ctx->pos < ctx->size);
71  assert(ctx->pos <= ctx->end);
72  assert(ctx->end - ctx->pos <= ctx->size);
73 
74  size_t cpos = spscring_atomic_load(&ring->c.pos) + ctx->size;
75  cpos -= ctx->base;
76  ctx->end = cpos;
77  assert(ctx->pos <= ctx->end);
78  assert(ctx->end - ctx->pos <= ctx->size);
79  return ctx->end - ctx->pos;
80 }
81 
82 size_t
84 {
85  assert(ring);
86  struct spscring_ctx *ctx = &ring->p.ctx;
87  assert(ctx->size <= SIZE_MAX / 2 + 1);
88  assert(ctx->pos < ctx->size);
89  assert(ctx->pos <= ctx->end);
90  assert(ctx->end - ctx->pos <= ctx->size);
91 
92  if (ctx->end < ctx->size)
93  spscring_p_capacity(ring);
94  return MIN(ctx->end, ctx->size) - ctx->pos;
95 }
96 
97 size_t
98 spscring_p_alloc(struct spscring *ring, size_t *psize)
99 {
100  assert(ring);
101  struct spscring_ctx *ctx = &ring->p.ctx;
102  assert(ctx->size <= SIZE_MAX / 2 + 1);
103  assert(ctx->pos < ctx->size);
104  assert(ctx->pos <= ctx->end);
105  assert(ctx->end - ctx->pos <= ctx->size);
106  assert(psize);
107 
108  size_t capacity = ctx->end - ctx->pos;
109  if (*psize > capacity) {
110  capacity = spscring_p_capacity(ring);
111  if (*psize > capacity)
112  *psize = capacity;
113  }
114  return ctx->pos;
115 }
116 
117 size_t
118 spscring_p_alloc_no_wrap(struct spscring *ring, size_t *psize)
119 {
120  assert(ring);
121  struct spscring_ctx *ctx = &ring->p.ctx;
122  assert(ctx->size <= SIZE_MAX / 2 + 1);
123  assert(ctx->pos < ctx->size);
124  assert(ctx->pos <= ctx->end);
125  assert(ctx->end - ctx->pos <= ctx->size);
126  assert(psize);
127 
128  size_t capacity = MIN(ctx->end, ctx->size) - ctx->pos;
129  if (*psize > capacity) {
130  capacity = spscring_p_capacity_no_wrap(ring);
131  if (*psize > capacity)
132  *psize = capacity;
133  }
134  return ctx->pos;
135 }
136 
137 size_t
138 spscring_p_commit(struct spscring *ring, size_t size)
139 {
140  assert(ring);
141  struct spscring_ctx *ctx = &ring->p.ctx;
142  assert(ctx->size <= SIZE_MAX / 2 + 1);
143  assert(ctx->pos < ctx->size);
144  assert(ctx->pos <= ctx->end);
145  assert(ctx->end - ctx->pos <= ctx->size);
146  assert(size <= ctx->end - ctx->pos);
147 
148  if ((ctx->pos += size) >= ctx->size) {
149  ctx->base += ctx->size;
150  ctx->pos -= ctx->size;
151  assert(ctx->end >= ctx->size);
152  ctx->end -= ctx->size;
153  }
154 
155  spscring_atomic_store(&ring->p.pos, ctx->base + ctx->pos);
156 
157  spscring_pc_signal(ring);
158 
159  return ctx->pos;
160 }
161 
162 int
163 spscring_p_submit_wait(struct spscring *ring, size_t size,
164  void (*func)(struct spscring *ring, void *arg), void *arg)
165 {
166  assert(ring);
167  struct spscring_ctx *ctx = &ring->p.ctx;
168  assert(ctx->size <= SIZE_MAX / 2 + 1);
169  struct spscring_sig *sig = &ring->p.sig;
170 
171  if (size > ctx->size)
172  size = ctx->size;
173 
174  // Abort the previous wait, if any.
175  spscring_p_abort_wait(ring);
176 
177  sig->func = func;
178  sig->arg = arg;
179 
180  // Check if the wait condition is already satisfied.
181  if (size <= spscring_p_capacity(ring))
182  return 0;
183 
184  // Register the wait with the consumer.
185  spscring_atomic_store(&sig->size, size);
186 
187  // Check if a concurrent consumer has already satisfied the wait
188  // condition.
189  if (size <= spscring_p_capacity(ring)) {
190  // Try to stop the signal function from being invoked.
191  // clang-format off
192  if (spscring_atomic_compare_exchange_strong(
193  &sig->size, &size, 0))
194  // clang-format on
195  return 0;
196  // The consumer has already begun invoking the signal function.
197  assert(size == 0 || size == SIZE_MAX);
198  }
199 
200  return 1;
201 }
202 
203 int
205 {
206  assert(ring);
207  struct spscring_sig *sig = &ring->p.sig;
208 
209  size_t size = spscring_atomic_load(&sig->size);
210  while (size) {
211  // Abort the wait, unless spscring_cp_signal() just started
212  // reading the function pointer and argument.
213  if (size == SIZE_MAX) {
214  spscring_yield();
215  // Busy-wait until spscring_cp_signal() is done reading
216  // the function pointer and argument.
217  size = spscring_atomic_load(&sig->size);
218  // clang-format off
219  } else if (spscring_atomic_compare_exchange_weak(
220  &sig->size, &size, 0)) {
221  // clang-format on
222  return 1;
223  }
224  }
225  return 0;
226 }
227 
228 size_t
230 {
231  assert(ring);
232  struct spscring_ctx *ctx = &ring->c.ctx;
233  assert(ctx->size <= SIZE_MAX / 2 + 1);
234  assert(ctx->pos < ctx->size);
235  assert(ctx->pos <= ctx->end);
236  assert(ctx->end - ctx->pos <= ctx->size);
237 
238  size_t ppos = spscring_atomic_load(&ring->p.pos);
239  ppos -= ctx->base;
240  ctx->end = ppos;
241  assert(ctx->pos <= ppos);
242  assert(ctx->end - ctx->pos <= ctx->size);
243  return ctx->end - ctx->pos;
244 }
245 
246 size_t
248 {
249  assert(ring);
250  struct spscring_ctx *ctx = &ring->c.ctx;
251  assert(ctx->size <= SIZE_MAX / 2 + 1);
252  assert(ctx->pos < ctx->size);
253  assert(ctx->pos <= ctx->end);
254  assert(ctx->end - ctx->pos <= ctx->size);
255 
256  if (ctx->end < ctx->size)
257  spscring_c_capacity(ring);
258  return MIN(ctx->end, ctx->size) - ctx->pos;
259 }
260 
261 size_t
262 spscring_c_alloc(struct spscring *ring, size_t *psize)
263 {
264  assert(ring);
265  struct spscring_ctx *ctx = &ring->c.ctx;
266  assert(ctx->size <= SIZE_MAX / 2 + 1);
267  assert(ctx->pos < ctx->size);
268  assert(ctx->pos <= ctx->end);
269  assert(ctx->end - ctx->pos <= ctx->size);
270  assert(psize);
271 
272  size_t capacity = ctx->end - ctx->pos;
273  if (*psize > capacity) {
274  capacity = spscring_c_capacity(ring);
275  if (*psize > capacity)
276  *psize = capacity;
277  }
278  return ctx->pos;
279 }
280 
281 size_t
282 spscring_c_alloc_no_wrap(struct spscring *ring, size_t *psize)
283 {
284  assert(ring);
285  struct spscring_ctx *ctx = &ring->c.ctx;
286  assert(ctx->size <= SIZE_MAX / 2 + 1);
287  assert(ctx->pos < ctx->size);
288  assert(ctx->pos <= ctx->end);
289  assert(ctx->end - ctx->pos <= ctx->size);
290  assert(psize);
291 
292  size_t capacity = MIN(ctx->end, ctx->size) - ctx->pos;
293  if (*psize > capacity) {
294  capacity = spscring_c_capacity_no_wrap(ring);
295  if (*psize > capacity)
296  *psize = capacity;
297  }
298  return ctx->pos;
299 }
300 
301 size_t
302 spscring_c_commit(struct spscring *ring, size_t size)
303 {
304  assert(ring);
305  struct spscring_ctx *ctx = &ring->c.ctx;
306  assert(ctx->size <= SIZE_MAX / 2 + 1);
307  assert(ctx->pos < ctx->size);
308  assert(ctx->pos <= ctx->end);
309  assert(ctx->end - ctx->pos <= ctx->size);
310  assert(size <= ctx->end - ctx->pos);
311 
312  if ((ctx->pos += size) >= ctx->size) {
313  ctx->base += ctx->size;
314  ctx->pos -= ctx->size;
315  assert(ctx->end >= ctx->size);
316  ctx->end -= ctx->size;
317  }
318 
319  spscring_atomic_store(&ring->c.pos, ctx->base + ctx->pos);
320 
321  spscring_cp_signal(ring);
322 
323  return ctx->pos;
324 }
325 
326 int
327 spscring_c_submit_wait(struct spscring *ring, size_t size,
328  void (*func)(struct spscring *ring, void *arg), void *arg)
329 {
330  assert(ring);
331  struct spscring_ctx *ctx = &ring->c.ctx;
332  assert(ctx->size <= SIZE_MAX / 2 + 1);
333  struct spscring_sig *sig = &ring->c.sig;
334 
335  if (size > ctx->size)
336  size = ctx->size;
337 
338  // Abort the previous wait, if any.
339  spscring_c_abort_wait(ring);
340 
341  sig->func = func;
342  sig->arg = arg;
343 
344  // Check if the wait condition is already satisfied.
345  if (size <= spscring_c_capacity(ring))
346  return 0;
347 
348  // Register the wait with the producer.
349  spscring_atomic_store(&sig->size, size);
350 
351  // Check if a concurrent producer has already satisfied the wait
352  // condition.
353  if (size <= spscring_c_capacity(ring)) {
354  // Try to stop the signal function from being invoked.
355  // clang-format off
356  if (spscring_atomic_compare_exchange_strong(
357  &sig->size, &size, 0))
358  // clang-format on
359  return 0;
360  // The producer has already begun invoking the signal function.
361  assert(size == 0 || size == SIZE_MAX);
362  }
363 
364  return 1;
365 }
366 
367 int
369 {
370  assert(ring);
371  struct spscring_sig *sig = &ring->c.sig;
372 
373  size_t size = spscring_atomic_load(&sig->size);
374  while (size) {
375  // Abort the wait, unless spscring_pc_signal() just started
376  // reading the function pointer and argument.
377  if (size == SIZE_MAX) {
378  spscring_yield();
379  // Busy-wait until spscring_pc_signal() is done reading
380  // the function pointer and argument.
381  size = spscring_atomic_load(&sig->size);
382  // clang-format off
383  } else if (spscring_atomic_compare_exchange_weak(
384  &sig->size, &size, 0)) {
385  // clang-format on
386  return 1;
387  }
388  }
389  return 0;
390 }
391 
392 static void
393 spscring_pc_signal(struct spscring *ring)
394 {
395  assert(ring);
396  struct spscring_ctx *ctx = &ring->p.ctx;
397  struct spscring_sig *sig = &ring->c.sig;
398 
399  // Check if the consumer has registered a wait.
400  size_t size = spscring_atomic_load(&sig->size);
401  while (size) {
402  // Abort if the wait condition is not satisfied.
403  if (size > ctx->size - spscring_p_capacity(ring))
404  return;
405 
406  // Try to set the requested size to the special value SIZE_MAX
407  // to notify spscring_c_abort_wait() that we're about to read
408  // the function pointer and argument.
409  // clang-format off
410  if (spscring_atomic_compare_exchange_weak(
411  &sig->size, &size, SIZE_MAX)) {
412  // clang-format on
413  // Copy the function pointer and argument to allow the
414  // consumer to register a different signal function for
415  // the next wait.
416  void (*func)(struct spscring * ring, void *arg) =
417  sig->func;
418  void *arg = sig->arg;
419 
420  // Set the requested size to 0 to indicate that the wait
421  // condition was satisfied.
422  spscring_atomic_store(&sig->size, 0);
423 
424  // Invoke the signal function, if specified.
425  if (func)
426  func(ring, arg);
427  return;
428  }
429 
430  spscring_yield();
431  }
432 }
433 
434 static void
435 spscring_cp_signal(struct spscring *ring)
436 {
437  assert(ring);
438  struct spscring_ctx *ctx = &ring->c.ctx;
439  struct spscring_sig *sig = &ring->p.sig;
440 
441  // Check if the producer has registered a wait.
442  size_t size = spscring_atomic_load(&sig->size);
443  while (size) {
444  // Abort if the wait condition is not satisfied.
445  if (size > ctx->size - spscring_c_capacity(ring))
446  return;
447 
448  // Try to set the requested size to the special value SIZE_MAX
449  // to notify spscring_p_abort_wait() that we're about to read
450  // the function pointer and argument.
451  // clang-format off
452  if (spscring_atomic_compare_exchange_weak(
453  &sig->size, &size, SIZE_MAX)) {
454  // clang-format on
455  // Copy the function pointer and argument to allow the
456  // producer to register a different signal function for
457  // the next wait.
458  void (*func)(struct spscring * ring, void *arg) =
459  sig->func;
460  void *arg = sig->arg;
461 
462  // Set the requested size to 0 to indicate that the wait
463  // condition was satisfied.
464  spscring_atomic_store(&sig->size, 0);
465 
466  // Invoke the signal function, if specified.
467  if (func)
468  func(ring, arg);
469 
470  return;
471  }
472 
473  spscring_yield();
474  }
475 }
476 
477 static inline size_t
478 spscring_atomic_load(const volatile spscring_atomic_t *object)
479 {
480 #if LELY_NO_ATOMICS
481  return *object;
482 #else
484 #endif
485 }
486 
487 static inline void
488 spscring_atomic_store(volatile spscring_atomic_t *object, size_t desired)
489 {
490 #if LELY_NO_ATOMICS
491  *object = desired;
492 #else
494 #endif
495 }
496 
497 static inline _Bool
498 spscring_atomic_compare_exchange_strong(volatile spscring_atomic_t *object,
499  size_t *expected, size_t desired)
500 {
501 #if LELY_NO_ATOMICS
502 #if _WIN32
503 #if _WIN64
504  size_t tmp = InterlockedCompareExchange64(
505  (volatile LONGLONG *)object, desired, *expected);
506 #else
507  size_t tmp = InterlockedCompareExchange(
508  (volatile LONG *)object, desired, *expected);
509 #endif
510  _Bool result = tmp == *expected;
511  if (!result)
512  *expected = tmp;
513  return result;
514 #else
515  assert(*object == *expected);
516  (void)expected;
517  *object = desired;
518  return 1;
519 #endif
520 #else
521  return atomic_compare_exchange_strong_explicit(object, expected,
523 #endif
524 }
525 
526 static inline _Bool
527 spscring_atomic_compare_exchange_weak(volatile spscring_atomic_t *object,
528  size_t *expected, size_t desired)
529 {
530 #if LELY_NO_ATOMICS
531  return spscring_atomic_compare_exchange_strong(
532  object, expected, desired);
533 #else
534  return atomic_compare_exchange_weak_explicit(object, expected, desired,
536 #endif
537 }
538 
539 static inline void
540 spscring_yield(void)
541 {
542 #if _WIN32
543  YieldProcessor();
544 #elif (defined(__i386__) || defined(__x86_64__)) \
545  && (GNUC_PREREQ(4, 7) || __has_builtin(__builtin_ia32_pause))
546  __builtin_ia32_pause();
547 #endif
548 }
#define MIN(a, b)
Returns the minimum of a and b.
Definition: util.h:57
int spscring_p_abort_wait(struct spscring *ring)
Aborts a wait operation previously registered with spscring_p_submit_wait().
Definition: spscring.c:204
size_t spscring_c_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:262
int spscring_c_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:327
size_t spscring_p_capacity(struct spscring *ring)
Returns the total capacity available for a producer in a single-producer single-consumer ring buffer,...
Definition: spscring.c:65
size_t spscring_p_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a consumer and, if this satisfies a wait operation...
Definition: spscring.c:138
int spscring_p_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:163
size_t spscring_c_capacity_no_wrap(struct spscring *ring)
Returns the total capacity available for a consumer in a single-producer single-consumer ring buffer,...
Definition: spscring.c:247
size_t spscring_c_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a producer and, if this satisfies a wait operation...
Definition: spscring.c:302
size_t spscring_c_capacity(struct spscring *ring)
Returns the total capacity available for a consumer in a single-producer single-consumer ring buffer,...
Definition: spscring.c:229
int spscring_c_abort_wait(struct spscring *ring)
Aborts a wait operation previously registered with spscring_c_submit_wait().
Definition: spscring.c:368
size_t spscring_p_capacity_no_wrap(struct spscring *ring)
Returns the total capacity available for a producer in a single-producer single-consumer ring buffer,...
Definition: spscring.c:83
size_t spscring_c_alloc_no_wrap(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, without wrapping, in a single-producer, single-consumer rin...
Definition: spscring.c:282
void spscring_init(struct spscring *ring, size_t size)
Initializes a single-producer, single-consumer ring buffer with the specified size.
Definition: spscring.c:47
size_t spscring_p_alloc_no_wrap(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, without wrapping, in a single-producer, single-consumer rin...
Definition: spscring.c:118
size_t spscring_size(const struct spscring *ring)
Returns the size of a single-producer, single-consumer ring buffer.
Definition: spscring.c:56
size_t spscring_p_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition: spscring.c:98
This header file is part of the utilities library; it contains the single-producer,...
#define SPSCRING_INIT(size)
The static initializer for spscring.
Definition: spscring.h:97
This is the internal header file of the utilities library.
@ memory_order_release
A store operation performs a release operation on the affected memory location.
Definition: stdatomic.h:158
@ memory_order_acq_rel
A load operation performs an acquire operation on the affected memory location, and a store operation...
Definition: stdatomic.h:168
@ memory_order_acquire
A load operation performs an acquire operation on the affected memory location.
Definition: stdatomic.h:149
#define atomic_compare_exchange_strong_explicit(object, expected, desired, success, failure)
Atomically compares the value at object for equality with that at expected, and if true,...
Definition: stdatomic.h:405
#define atomic_load_explicit(object, order)
Atomically returns the value at object.
Definition: stdatomic.h:344
#define atomic_store_explicit(object, desired, order)
Atomically replaces the value at object with the value of desired.
Definition: stdatomic.h:325
#define atomic_compare_exchange_weak_explicit(object, expected, desired, success, failure)
Equivalent to atomic_compare_exchange_strong_explicit(), except that a weak compare-and-exchange may ...
Definition: stdatomic.h:452
This header file is part of the C11 and POSIX compatibility library; it includes <stdint....
A single-producer, single-consumer ring buffer.
Definition: spscring.h:63