Lely core libraries 2.3.4
spscring.c
Go to the documentation of this file.
1
24#include "util.h"
25#include <lely/util/spscring.h>
26
27#include <assert.h>
28#include <stdint.h>
29
30static void spscring_pc_signal(struct spscring *ring);
31static void spscring_cp_signal(struct spscring *ring);
32
33static inline size_t spscring_atomic_load(
34 const volatile spscring_atomic_t *object);
35static inline void spscring_atomic_store(
36 volatile spscring_atomic_t *object, size_t desired);
37static inline _Bool spscring_atomic_compare_exchange_strong(
38 volatile spscring_atomic_t *object, size_t *expected,
39 size_t desired);
40static inline _Bool spscring_atomic_compare_exchange_weak(
41 volatile spscring_atomic_t *object, size_t *expected,
42 size_t desired);
43
44static inline void spscring_yield(void);
45
46void
47spscring_init(struct spscring *ring, size_t size)
48{
49 assert(ring);
50 assert(size <= SIZE_MAX / 2 + 1);
51
52 *ring = (struct spscring)SPSCRING_INIT(size);
53}
54
55size_t
56spscring_size(const struct spscring *ring)
57{
58 assert(ring);
59 assert(ring->p.ctx.size == ring->c.ctx.size);
60
61 return ring->p.ctx.size;
62}
63
64size_t
66{
67 assert(ring);
68 struct spscring_ctx *ctx = &ring->p.ctx;
69 assert(ctx->size <= SIZE_MAX / 2 + 1);
70 assert(ctx->pos < ctx->size);
71 assert(ctx->pos <= ctx->end);
72 assert(ctx->end - ctx->pos <= ctx->size);
73
74 size_t cpos = spscring_atomic_load(&ring->c.pos) + ctx->size;
75 cpos -= ctx->base;
76 ctx->end = cpos;
77 assert(ctx->pos <= ctx->end);
78 assert(ctx->end - ctx->pos <= ctx->size);
79 return ctx->end - ctx->pos;
80}
81
82size_t
84{
85 assert(ring);
86 struct spscring_ctx *ctx = &ring->p.ctx;
87 assert(ctx->size <= SIZE_MAX / 2 + 1);
88 assert(ctx->pos < ctx->size);
89 assert(ctx->pos <= ctx->end);
90 assert(ctx->end - ctx->pos <= ctx->size);
91
92 if (ctx->end < ctx->size)
94 return MIN(ctx->end, ctx->size) - ctx->pos;
95}
96
97size_t
98spscring_p_alloc(struct spscring *ring, size_t *psize)
99{
100 assert(ring);
101 struct spscring_ctx *ctx = &ring->p.ctx;
102 assert(ctx->size <= SIZE_MAX / 2 + 1);
103 assert(ctx->pos < ctx->size);
104 assert(ctx->pos <= ctx->end);
105 assert(ctx->end - ctx->pos <= ctx->size);
106 assert(psize);
107
108 size_t capacity = ctx->end - ctx->pos;
109 if (*psize > capacity) {
110 capacity = spscring_p_capacity(ring);
111 if (*psize > capacity)
112 *psize = capacity;
113 }
114 return ctx->pos;
115}
116
117size_t
118spscring_p_alloc_no_wrap(struct spscring *ring, size_t *psize)
119{
120 assert(ring);
121 struct spscring_ctx *ctx = &ring->p.ctx;
122 assert(ctx->size <= SIZE_MAX / 2 + 1);
123 assert(ctx->pos < ctx->size);
124 assert(ctx->pos <= ctx->end);
125 assert(ctx->end - ctx->pos <= ctx->size);
126 assert(psize);
127
128 size_t capacity = MIN(ctx->end, ctx->size) - ctx->pos;
129 if (*psize > capacity) {
130 capacity = spscring_p_capacity_no_wrap(ring);
131 if (*psize > capacity)
132 *psize = capacity;
133 }
134 return ctx->pos;
135}
136
137size_t
138spscring_p_commit(struct spscring *ring, size_t size)
139{
140 assert(ring);
141 struct spscring_ctx *ctx = &ring->p.ctx;
142 assert(ctx->size <= SIZE_MAX / 2 + 1);
143 assert(ctx->pos < ctx->size);
144 assert(ctx->pos <= ctx->end);
145 assert(ctx->end - ctx->pos <= ctx->size);
146 assert(size <= ctx->end - ctx->pos);
147
148 if ((ctx->pos += size) >= ctx->size) {
149 ctx->base += ctx->size;
150 ctx->pos -= ctx->size;
151 assert(ctx->end >= ctx->size);
152 ctx->end -= ctx->size;
153 }
154
155 spscring_atomic_store(&ring->p.pos, ctx->base + ctx->pos);
156
157 spscring_pc_signal(ring);
158
159 return ctx->pos;
160}
161
162int
163spscring_p_submit_wait(struct spscring *ring, size_t size,
164 void (*func)(struct spscring *ring, void *arg), void *arg)
165{
166 assert(ring);
167 struct spscring_ctx *ctx = &ring->p.ctx;
168 assert(ctx->size <= SIZE_MAX / 2 + 1);
169 struct spscring_sig *sig = &ring->p.sig;
170
171 if (size > ctx->size)
172 size = ctx->size;
173
174 // Abort the previous wait, if any.
176
177 sig->func = func;
178 sig->arg = arg;
179
180 // Check if the wait condition is already satisfied.
181 if (size <= spscring_p_capacity(ring))
182 return 0;
183
184 // Register the wait with the consumer.
185 spscring_atomic_store(&sig->size, size);
186
187 // Check if a concurrent consumer has already satisfied the wait
188 // condition.
189 if (size <= spscring_p_capacity(ring)) {
190 // Try to stop the signal function from being invoked.
191 // clang-format off
192 if (spscring_atomic_compare_exchange_strong(
193 &sig->size, &size, 0))
194 // clang-format on
195 return 0;
196 // The consumer has already begun invoking the signal function.
197 assert(size == 0 || size == SIZE_MAX);
198 }
199
200 return 1;
201}
202
203int
205{
206 assert(ring);
207 struct spscring_sig *sig = &ring->p.sig;
208
209 size_t size = spscring_atomic_load(&sig->size);
210 while (size) {
211 // Abort the wait, unless spscring_cp_signal() just started
212 // reading the function pointer and argument.
213 if (size == SIZE_MAX) {
214 spscring_yield();
215 // Busy-wait until spscring_cp_signal() is done reading
216 // the function pointer and argument.
217 size = spscring_atomic_load(&sig->size);
218 // clang-format off
219 } else if (spscring_atomic_compare_exchange_weak(
220 &sig->size, &size, 0)) {
221 // clang-format on
222 return 1;
223 }
224 }
225 return 0;
226}
227
228size_t
230{
231 assert(ring);
232 struct spscring_ctx *ctx = &ring->c.ctx;
233 assert(ctx->size <= SIZE_MAX / 2 + 1);
234 assert(ctx->pos < ctx->size);
235 assert(ctx->pos <= ctx->end);
236 assert(ctx->end - ctx->pos <= ctx->size);
237
238 size_t ppos = spscring_atomic_load(&ring->p.pos);
239 ppos -= ctx->base;
240 ctx->end = ppos;
241 assert(ctx->pos <= ppos);
242 assert(ctx->end - ctx->pos <= ctx->size);
243 return ctx->end - ctx->pos;
244}
245
246size_t
248{
249 assert(ring);
250 struct spscring_ctx *ctx = &ring->c.ctx;
251 assert(ctx->size <= SIZE_MAX / 2 + 1);
252 assert(ctx->pos < ctx->size);
253 assert(ctx->pos <= ctx->end);
254 assert(ctx->end - ctx->pos <= ctx->size);
255
256 if (ctx->end < ctx->size)
258 return MIN(ctx->end, ctx->size) - ctx->pos;
259}
260
261size_t
262spscring_c_alloc(struct spscring *ring, size_t *psize)
263{
264 assert(ring);
265 struct spscring_ctx *ctx = &ring->c.ctx;
266 assert(ctx->size <= SIZE_MAX / 2 + 1);
267 assert(ctx->pos < ctx->size);
268 assert(ctx->pos <= ctx->end);
269 assert(ctx->end - ctx->pos <= ctx->size);
270 assert(psize);
271
272 size_t capacity = ctx->end - ctx->pos;
273 if (*psize > capacity) {
274 capacity = spscring_c_capacity(ring);
275 if (*psize > capacity)
276 *psize = capacity;
277 }
278 return ctx->pos;
279}
280
281size_t
282spscring_c_alloc_no_wrap(struct spscring *ring, size_t *psize)
283{
284 assert(ring);
285 struct spscring_ctx *ctx = &ring->c.ctx;
286 assert(ctx->size <= SIZE_MAX / 2 + 1);
287 assert(ctx->pos < ctx->size);
288 assert(ctx->pos <= ctx->end);
289 assert(ctx->end - ctx->pos <= ctx->size);
290 assert(psize);
291
292 size_t capacity = MIN(ctx->end, ctx->size) - ctx->pos;
293 if (*psize > capacity) {
294 capacity = spscring_c_capacity_no_wrap(ring);
295 if (*psize > capacity)
296 *psize = capacity;
297 }
298 return ctx->pos;
299}
300
301size_t
302spscring_c_commit(struct spscring *ring, size_t size)
303{
304 assert(ring);
305 struct spscring_ctx *ctx = &ring->c.ctx;
306 assert(ctx->size <= SIZE_MAX / 2 + 1);
307 assert(ctx->pos < ctx->size);
308 assert(ctx->pos <= ctx->end);
309 assert(ctx->end - ctx->pos <= ctx->size);
310 assert(size <= ctx->end - ctx->pos);
311
312 if ((ctx->pos += size) >= ctx->size) {
313 ctx->base += ctx->size;
314 ctx->pos -= ctx->size;
315 assert(ctx->end >= ctx->size);
316 ctx->end -= ctx->size;
317 }
318
319 spscring_atomic_store(&ring->c.pos, ctx->base + ctx->pos);
320
321 spscring_cp_signal(ring);
322
323 return ctx->pos;
324}
325
326int
327spscring_c_submit_wait(struct spscring *ring, size_t size,
328 void (*func)(struct spscring *ring, void *arg), void *arg)
329{
330 assert(ring);
331 struct spscring_ctx *ctx = &ring->c.ctx;
332 assert(ctx->size <= SIZE_MAX / 2 + 1);
333 struct spscring_sig *sig = &ring->c.sig;
334
335 if (size > ctx->size)
336 size = ctx->size;
337
338 // Abort the previous wait, if any.
340
341 sig->func = func;
342 sig->arg = arg;
343
344 // Check if the wait condition is already satisfied.
345 if (size <= spscring_c_capacity(ring))
346 return 0;
347
348 // Register the wait with the producer.
349 spscring_atomic_store(&sig->size, size);
350
351 // Check if a concurrent producer has already satisfied the wait
352 // condition.
353 if (size <= spscring_c_capacity(ring)) {
354 // Try to stop the signal function from being invoked.
355 // clang-format off
356 if (spscring_atomic_compare_exchange_strong(
357 &sig->size, &size, 0))
358 // clang-format on
359 return 0;
360 // The producer has already begun invoking the signal function.
361 assert(size == 0 || size == SIZE_MAX);
362 }
363
364 return 1;
365}
366
367int
369{
370 assert(ring);
371 struct spscring_sig *sig = &ring->c.sig;
372
373 size_t size = spscring_atomic_load(&sig->size);
374 while (size) {
375 // Abort the wait, unless spscring_pc_signal() just started
376 // reading the function pointer and argument.
377 if (size == SIZE_MAX) {
378 spscring_yield();
379 // Busy-wait until spscring_pc_signal() is done reading
380 // the function pointer and argument.
381 size = spscring_atomic_load(&sig->size);
382 // clang-format off
383 } else if (spscring_atomic_compare_exchange_weak(
384 &sig->size, &size, 0)) {
385 // clang-format on
386 return 1;
387 }
388 }
389 return 0;
390}
391
392static void
393spscring_pc_signal(struct spscring *ring)
394{
395 assert(ring);
396 struct spscring_ctx *ctx = &ring->p.ctx;
397 struct spscring_sig *sig = &ring->c.sig;
398
399 // Check if the consumer has registered a wait.
400 size_t size = spscring_atomic_load(&sig->size);
401 while (size) {
402 // Abort if the wait condition is not satisfied.
403 if (size > ctx->size - spscring_p_capacity(ring))
404 return;
405
406 // Try to set the requested size to the special value SIZE_MAX
407 // to notify spscring_c_abort_wait() that we're about to read
408 // the function pointer and argument.
409 // clang-format off
410 if (spscring_atomic_compare_exchange_weak(
411 &sig->size, &size, SIZE_MAX)) {
412 // clang-format on
413 // Copy the function pointer and argument to allow the
414 // consumer to register a different signal function for
415 // the next wait.
416 void (*func)(struct spscring * ring, void *arg) =
417 sig->func;
418 void *arg = sig->arg;
419
420 // Set the requested size to 0 to indicate that the wait
421 // condition was satisfied.
422 spscring_atomic_store(&sig->size, 0);
423
424 // Invoke the signal function, if specified.
425 if (func)
426 func(ring, arg);
427 return;
428 }
429
430 spscring_yield();
431 }
432}
433
434static void
435spscring_cp_signal(struct spscring *ring)
436{
437 assert(ring);
438 struct spscring_ctx *ctx = &ring->c.ctx;
439 struct spscring_sig *sig = &ring->p.sig;
440
441 // Check if the producer has registered a wait.
442 size_t size = spscring_atomic_load(&sig->size);
443 while (size) {
444 // Abort if the wait condition is not satisfied.
445 if (size > ctx->size - spscring_c_capacity(ring))
446 return;
447
448 // Try to set the requested size to the special value SIZE_MAX
449 // to notify spscring_p_abort_wait() that we're about to read
450 // the function pointer and argument.
451 // clang-format off
452 if (spscring_atomic_compare_exchange_weak(
453 &sig->size, &size, SIZE_MAX)) {
454 // clang-format on
455 // Copy the function pointer and argument to allow the
456 // producer to register a different signal function for
457 // the next wait.
458 void (*func)(struct spscring * ring, void *arg) =
459 sig->func;
460 void *arg = sig->arg;
461
462 // Set the requested size to 0 to indicate that the wait
463 // condition was satisfied.
464 spscring_atomic_store(&sig->size, 0);
465
466 // Invoke the signal function, if specified.
467 if (func)
468 func(ring, arg);
469
470 return;
471 }
472
473 spscring_yield();
474 }
475}
476
477static inline size_t
478spscring_atomic_load(const volatile spscring_atomic_t *object)
479{
480#if LELY_NO_ATOMICS
481 return *object;
482#else
483 return atomic_load_explicit(object, memory_order_acquire);
484#endif
485}
486
487static inline void
488spscring_atomic_store(volatile spscring_atomic_t *object, size_t desired)
489{
490#if LELY_NO_ATOMICS
491 *object = desired;
492#else
493 atomic_store_explicit(object, desired, memory_order_release);
494#endif
495}
496
497static inline _Bool
498spscring_atomic_compare_exchange_strong(volatile spscring_atomic_t *object,
499 size_t *expected, size_t desired)
500{
501#if LELY_NO_ATOMICS
502#if _WIN32
503#if _WIN64
504 size_t tmp = InterlockedCompareExchange64(
505 (volatile LONGLONG *)object, desired, *expected);
506#else
507 size_t tmp = InterlockedCompareExchange(
508 (volatile LONG *)object, desired, *expected);
509#endif
510 _Bool result = tmp == *expected;
511 if (!result)
512 *expected = tmp;
513 return result;
514#else
515 assert(*object == *expected);
516 (void)expected;
517 *object = desired;
518 return 1;
519#endif
520#else
521 return atomic_compare_exchange_strong_explicit(object, expected,
522 desired, memory_order_acq_rel, memory_order_acquire);
523#endif
524}
525
526static inline _Bool
527spscring_atomic_compare_exchange_weak(volatile spscring_atomic_t *object,
528 size_t *expected, size_t desired)
529{
530#if LELY_NO_ATOMICS
531 return spscring_atomic_compare_exchange_strong(
532 object, expected, desired);
533#else
534 return atomic_compare_exchange_weak_explicit(object, expected, desired,
535 memory_order_acq_rel, memory_order_acquire);
536#endif
537}
538
539static inline void
540spscring_yield(void)
541{
542#if _WIN32
543 YieldProcessor();
544#elif (defined(__i386__) || defined(__x86_64__)) \
545 && (GNUC_PREREQ(4, 7) || __has_builtin(__builtin_ia32_pause))
546 __builtin_ia32_pause();
547#endif
548}
#define MIN(a, b)
Returns the minimum of a and b.
Definition util.h:57
int spscring_p_abort_wait(struct spscring *ring)
Aborts a wait operation previously registered with spscring_p_submit_wait().
Definition spscring.c:204
size_t spscring_c_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition spscring.c:262
int spscring_c_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer,...
Definition spscring.c:327
size_t spscring_p_capacity(struct spscring *ring)
Returns the total capacity available for a producer in a single-producer single-consumer ring buffer,...
Definition spscring.c:65
size_t spscring_p_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a consumer and, if this satisfies a wait operation...
Definition spscring.c:138
int spscring_p_submit_wait(struct spscring *ring, size_t size, void(*func)(struct spscring *ring, void *arg), void *arg)
Checks if the requested range of indices, including wrapping, in a single-producer,...
Definition spscring.c:163
size_t spscring_c_capacity_no_wrap(struct spscring *ring)
Returns the total capacity available for a consumer in a single-producer single-consumer ring buffer,...
Definition spscring.c:247
size_t spscring_c_commit(struct spscring *ring, size_t size)
Makes the specified number of indices available to a producer and, if this satisfies a wait operation...
Definition spscring.c:302
size_t spscring_c_capacity(struct spscring *ring)
Returns the total capacity available for a consumer in a single-producer single-consumer ring buffer,...
Definition spscring.c:229
int spscring_c_abort_wait(struct spscring *ring)
Aborts a wait operation previously registered with spscring_c_submit_wait().
Definition spscring.c:368
size_t spscring_p_capacity_no_wrap(struct spscring *ring)
Returns the total capacity available for a producer in a single-producer single-consumer ring buffer,...
Definition spscring.c:83
size_t spscring_c_alloc_no_wrap(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, without wrapping, in a single-producer, single-consumer rin...
Definition spscring.c:282
void spscring_init(struct spscring *ring, size_t size)
Initializes a single-producer, single-consumer ring buffer with the specified size.
Definition spscring.c:47
size_t spscring_p_alloc_no_wrap(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, without wrapping, in a single-producer, single-consumer rin...
Definition spscring.c:118
size_t spscring_size(const struct spscring *ring)
Returns the size of a single-producer, single-consumer ring buffer.
Definition spscring.c:56
size_t spscring_p_alloc(struct spscring *ring, size_t *psize)
Allocates a consecutive range of indices, including wrapping, in a single-producer,...
Definition spscring.c:98
This header file is part of the utilities library; it contains the single-producer,...
#define SPSCRING_INIT(size)
The static initializer for spscring.
Definition spscring.h:97
This is the internal header file of the utilities library.
This header file is part of the C11 and POSIX compatibility library; it includes <stdint....
A single-producer, single-consumer ring buffer.
Definition spscring.h:63